executor.js 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. /**
  2. * VL Workflow Engine — Step Executor
  3. * Ported from Go: workflow/executor.go
  4. * Spec: v3.16
  5. *
  6. * Handles all 12 step types: Service, API, Component, LLM, Set, Write,
  7. * Download, Unzip, Pause, Branch, Loop, Stop (+ Noop)
  8. *
  9. * v3.16 additions:
  10. * - Loop `while` mode (condition loop, mutually exclusive with `source`)
  11. * - `BREAK` keyword in Loop children (exits entire loop)
  12. * - LLM `model` field: `<provider>/<modelId>` / `<provider>` / omitted
  13. */
  14. const crypto = require('crypto');
  15. const { ExpressionEvaluator, toBool } = require('./expression');
  16. const { RunEventType, WriteMode, ExecutionStatus, isV310OrLater, buildErrorMap } = require('./types');
  17. // ─── Service_* ───────────────────────────────────────────────
  18. async function executeServiceStep(engine, ctx, step) {
  19. const serviceName = step.id.replace(/^Service_/, '');
  20. const ev = new ExpressionEvaluator(ctx);
  21. const params = ev.evaluateDeep(step.in || {});
  22. const result = await ctx.serviceAdapter.call(serviceName, params);
  23. ctx.setVariable('_result', result?.data || result);
  24. await applyOutputMapping(engine, ctx, step);
  25. ctx.setVariable('_result', undefined);
  26. }
  27. // ─── Component_* ─────────────────────────────────────────────
  28. async function executeComponentStep(engine, ctx, step) {
  29. const componentID = step.id.replace(/^Component_/, '');
  30. const ev = new ExpressionEvaluator(ctx);
  31. const params = ev.evaluateDeep(step.in || {});
  32. const result = await ctx.componentAdapter.call(componentID, params);
  33. ctx.setVariable('_result', result);
  34. await applyOutputMapping(engine, ctx, step);
  35. ctx.setVariable('_result', undefined);
  36. }
  37. // ─── LLM_* ───────────────────────────────────────────────────
  38. async function executeLLMStep(engine, ctx, step) {
  39. const ev = new ExpressionEvaluator(ctx);
  40. const params = ev.evaluateDeep(step.in || {});
  41. // v3.16 — LLM model field: <provider>/<modelId>, <provider> only, or omitted
  42. if (step.model) {
  43. const parts = step.model.split('/');
  44. if (parts.length === 2) {
  45. params._provider = parts[0];
  46. params.model = parts[1];
  47. } else if (parts.length === 1) {
  48. params._provider = parts[0];
  49. // modelId omitted — adapter picks default for provider
  50. }
  51. }
  52. // Inject docs into system message
  53. await injectDocs(ctx, params);
  54. // Resolve schema references
  55. resolveSchemaRef(engine, params);
  56. const isStreaming = params.stream === true;
  57. let streamTokens = [];
  58. // Extended streaming callbacks — separate thinking, response, tool_use, tool_result
  59. const callbacks = isStreaming ? {
  60. onToken: (delta) => {
  61. streamTokens.push(delta);
  62. emitEvent(ctx, RunEventType.LLMToken, step.id, { delta });
  63. },
  64. onThinking: (delta) => {
  65. emitEvent(ctx, RunEventType.LLMThinking, step.id, { delta });
  66. },
  67. onToolUse: (toolUse) => {
  68. emitEvent(ctx, RunEventType.LLMToolUse, step.id, {
  69. tool_use_id: toolUse.id,
  70. name: toolUse.name,
  71. input: toolUse.input
  72. });
  73. },
  74. onToolResult: (toolResult) => {
  75. emitEvent(ctx, RunEventType.LLMToolResult, step.id, {
  76. tool_use_id: toolResult.tool_use_id,
  77. content: toolResult.content,
  78. is_error: toolResult.is_error || false
  79. });
  80. }
  81. } : null;
  82. // Backward-compatible: pass both onToken (legacy) and callbacks (extended)
  83. const onToken = callbacks?.onToken || null;
  84. const callStart = Date.now();
  85. let result, error;
  86. try {
  87. result = await ctx.llmAdapter.call(params, onToken, callbacks);
  88. } catch (e) {
  89. error = e;
  90. }
  91. const latencyMs = Date.now() - callStart;
  92. if (error) {
  93. emitEvent(ctx, RunEventType.LLMError, step.id, {
  94. error: error.message,
  95. type: error.type || 'unknown',
  96. code: error.code || '',
  97. retryable: error.retryable || false,
  98. latency_ms: latencyMs
  99. });
  100. if (isV310OrLater(ctx.version)) {
  101. ctx.setVariable('_meta', buildMetaFromError(error, latencyMs));
  102. }
  103. throw error;
  104. }
  105. // Emit llm_done
  106. const usage = result.usage || {};
  107. emitEvent(ctx, RunEventType.LLMDone, step.id, {
  108. latency_ms: latencyMs,
  109. finish_reason: result.finish_reason || result.stop_reason || 'stop',
  110. model: result.model || '',
  111. usage: {
  112. input_tokens: usage.input_tokens || usage.prompt_tokens || 0,
  113. output_tokens: usage.output_tokens || usage.completion_tokens || 0,
  114. total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0)
  115. },
  116. // Include thinking summary if available
  117. thinking_tokens: usage.thinking_tokens || usage.cache_creation_input_tokens || 0,
  118. has_tool_use: !!(result.tool_calls || result.tool_use || []).length
  119. });
  120. // Apply result
  121. if (isV310OrLater(ctx.version)) {
  122. // v3.10+: _result = content only, _meta = metadata
  123. let content = result.content || '';
  124. // Unwrap json_object / json_schema structured output
  125. const formatType = params.output_config?.format?.type || params.output_config?.type;
  126. if (typeof content === 'string' && (formatType === 'json_object' || formatType === 'json_schema')) {
  127. try { content = JSON.parse(content); } catch {}
  128. }
  129. ctx.setVariable('_result', content);
  130. ctx.setVariable('_meta', buildMeta(result, latencyMs));
  131. } else {
  132. // v3.6–v3.9: _result = full result or just content for structured
  133. ctx.setVariable('_result', result);
  134. }
  135. await applyOutputMapping(engine, ctx, step);
  136. ctx.setVariable('_result', undefined);
  137. if (isV310OrLater(ctx.version)) ctx.setVariable('_meta', undefined);
  138. }
  139. function buildMeta(result, latencyMs) {
  140. const model = result.model || '';
  141. let provider = 'unknown';
  142. if (model.includes('claude')) provider = 'anthropic';
  143. else if (model.includes('gpt') || model.includes('o1') || model.includes('o3') || model.includes('o4')) provider = 'openai';
  144. const usage = result.usage || {};
  145. return {
  146. latency_ms: latencyMs,
  147. model, model_resolved: model, provider,
  148. finish_reason: result.finish_reason || result.stop_reason || 'stop',
  149. response_id: result.id || result.response_id || undefined,
  150. usage: {
  151. raw: usage,
  152. input_tokens: usage.input_tokens || usage.prompt_tokens || 0,
  153. output_tokens: usage.output_tokens || usage.completion_tokens || 0,
  154. total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0)
  155. }
  156. };
  157. }
  158. function buildMetaFromError(err, latencyMs) {
  159. return { latency_ms: latencyMs, error: buildErrorMap(err) };
  160. }
  161. async function injectDocs(ctx, params) {
  162. if (!params.docs || !ctx.docAdapter) return;
  163. const docIds = Array.isArray(params.docs) ? params.docs : [params.docs];
  164. const parts = [];
  165. for (const id of docIds) {
  166. try {
  167. const content = await ctx.docAdapter.get(String(id));
  168. if (content) parts.push(content);
  169. } catch {}
  170. }
  171. if (parts.length > 0) {
  172. const docsBlock = parts.join('\n\n---\n\n');
  173. if (params.system) params.system = params.system + '\n\n' + docsBlock;
  174. else params.system = docsBlock;
  175. }
  176. delete params.docs;
  177. }
  178. function resolveSchemaRef(engine, params) {
  179. if (!params.output_config?.schemaRef || !engine.registry) return;
  180. const ref = params.output_config.schemaRef;
  181. const schema = engine.registry.getSchema(ref);
  182. if (schema) {
  183. params.output_config.schema = schema;
  184. delete params.output_config.schemaRef;
  185. }
  186. }
  187. // ─── API_* ───────────────────────────────────────────────────
  188. async function executeAPIStep(engine, ctx, step) {
  189. const apiID = step.id.replace(/^API_/, '');
  190. const apiDef = engine.registry.getAPIDefinition(apiID);
  191. if (!apiDef) throw new Error(`API definition not found: ${apiID}`);
  192. const ev = new ExpressionEvaluator(ctx);
  193. const params = ev.evaluateDeep(step.in || {});
  194. // Resolve auth token
  195. if (apiDef.auth) {
  196. const authVal = apiDef.auth.startsWith('=') ? ev.evaluateValue(apiDef.auth) : ev.evaluate(apiDef.auth);
  197. if (authVal != null) params.authToken = authVal;
  198. }
  199. const result = await ctx.apiAdapter.call(apiDef, params);
  200. ctx.setVariable('_result', result);
  201. await applyOutputMapping(engine, ctx, step);
  202. ctx.setVariable('_result', undefined);
  203. }
  204. // ─── Set_* ───────────────────────────────────────────────────
  205. function executeSetStep(engine, ctx, step) {
  206. // Set steps may only have `out` (file writes handled by applyOutputMapping)
  207. if (!step.value && !step.target) return;
  208. const ev = new ExpressionEvaluator(ctx);
  209. const value = ev.evaluateValue(step.value);
  210. ev.setVariable(step.target, value);
  211. }
  212. // ─── Write_* ─────────────────────────────────────────────────
  213. async function executeWriteStep(engine, ctx, step) {
  214. const ev = new ExpressionEvaluator(ctx);
  215. let targetPath = ev.evaluateValue(step.target);
  216. if (typeof targetPath !== 'string') throw new Error(`Write target must be string, got ${typeof targetPath}`);
  217. targetPath = interpolateFilePath(ev, targetPath);
  218. targetPath = resolveTmpPath(ctx, targetPath);
  219. const value = ev.evaluateValue(step.value);
  220. const content = toBytes(value);
  221. const mode = step.mode || WriteMode.Overwrite;
  222. await ctx.fileAdapter.write(targetPath, content, mode);
  223. ctx.artifacts[targetPath] = targetPath;
  224. emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: content.length });
  225. }
  226. // ─── Branch_* / Check_* ──────────────────────────────────────
  227. function executeBranchStep(engine, ctx, step) {
  228. const ev = new ExpressionEvaluator(ctx);
  229. // Check_* style: condition / if_true / if_false
  230. if (step.condition !== undefined) {
  231. const expr = step.condition;
  232. const val = (typeof expr === 'string' && expr.startsWith('='))
  233. ? ev.evaluateValue(expr) : ev.evaluate(expr);
  234. return toBool(val) ? (step.if_true || null) : (step.if_false || null);
  235. }
  236. // Standard Branch_* style: cases array
  237. for (const cas of (step.cases || [])) {
  238. // Support both formats: [expr, stepId] and {condition, next}
  239. let expr, stepId;
  240. if (Array.isArray(cas)) {
  241. if (cas.length < 2) continue;
  242. [expr, stepId] = cas;
  243. } else if (cas && typeof cas === 'object') {
  244. expr = cas.condition;
  245. stepId = cas.next || cas.step;
  246. if (!expr || !stepId) continue;
  247. } else continue;
  248. if (expr === 'ELSE') {
  249. return stepId;
  250. }
  251. const val = (typeof expr === 'string' && expr.startsWith('='))
  252. ? ev.evaluateValue(expr) : ev.evaluate(expr);
  253. if (toBool(val)) return stepId;
  254. }
  255. // Default fallback (for object-style cases)
  256. return step.default || null;
  257. }
  258. // ─── Loop_* ──────────────────────────────────────────────────
  259. // Sentinel thrown inside loop children when `next: "BREAK"` is encountered.
  260. class LoopBreakSignal { constructor() { this.type = 'LOOP_BREAK'; } }
  261. async function executeLoopStep(engine, ctx, step) {
  262. const ev = new ExpressionEvaluator(ctx);
  263. const mode = step.mode || 'serial';
  264. const children = step.children || [];
  265. const isWhileMode = step.while != null;
  266. // ─── While mode (v3.16) ─────────────────────────────
  267. if (isWhileMode) {
  268. const maxIter = step.maxIterations || Infinity;
  269. let i = ctx._loopProgress?.get(step.id) || 0;
  270. while (i < maxIter) {
  271. if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break;
  272. // Evaluate while condition before each iteration
  273. const condVal = ev.evaluateValue(step.while);
  274. if (!toBool(condVal)) break;
  275. // Set loop locals (_item is not available in while mode, _index is)
  276. ctx.setVariable('_index', i);
  277. ctx.setVariable('_iterDir', `${step.id}_${i}`);
  278. let broke = false;
  279. try {
  280. for (const childID of children) {
  281. const childStep = engine.getStep(childID);
  282. if (childStep) await engine.executeStep(ctx, childStep);
  283. }
  284. } catch (e) {
  285. if (e instanceof LoopBreakSignal) { broke = true; }
  286. else throw e;
  287. } finally {
  288. ctx.setVariable('_index', undefined);
  289. ctx.setVariable('_iterDir', undefined);
  290. }
  291. i++;
  292. if (ctx._loopProgress) {
  293. ctx._loopProgress.set(step.id, i);
  294. if (engine._emitCheckpoint) engine._emitCheckpoint(ctx);
  295. }
  296. if (broke) break;
  297. }
  298. return; // while mode always serial, done
  299. }
  300. // ─── Source (array) mode ────────────────────────────
  301. let sourceExpr = step.source;
  302. if (typeof sourceExpr === 'string' && !sourceExpr.startsWith('=')) sourceExpr = '=' + sourceExpr;
  303. const rawSource = ev.evaluateValue(sourceExpr);
  304. const source = Array.isArray(rawSource) ? rawSource : [];
  305. if (!Array.isArray(rawSource)) {
  306. emitEvent(ctx, RunEventType.StepSkipped, step.id, {
  307. reason: `Loop source is ${rawSource === undefined ? 'undefined' : typeof rawSource} (expected array) — skipping loop`,
  308. expression: sourceExpr,
  309. });
  310. }
  311. // Apply maxIterations cap if present (v3.16)
  312. const maxIter = step.maxIterations ? Math.min(source.length, step.maxIterations) : source.length;
  313. const startIndex = ctx._loopProgress?.get(step.id) || 0;
  314. if (mode === 'serial') {
  315. for (let i = startIndex; i < maxIter; i++) {
  316. if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break;
  317. const broke = await executeLoopIteration(engine, ctx, step, children, source[i], i);
  318. if (ctx._loopProgress) {
  319. ctx._loopProgress.set(step.id, i + 1);
  320. if (engine._emitCheckpoint) engine._emitCheckpoint(ctx);
  321. }
  322. if (broke) break; // BREAK encountered
  323. }
  324. } else {
  325. // Parallel — BREAK support: flag to stop launching new iterations
  326. const { ChildExecutionContext } = require('./types');
  327. let breakTriggered = false;
  328. const branches = source.slice(startIndex, maxIter).map((item, offset) => {
  329. const index = startIndex + offset;
  330. return {
  331. id: `${step.id}[${index}]`,
  332. fn: async () => {
  333. if (breakTriggered) return; // Don't start new iterations after BREAK
  334. const childCtx = new ChildExecutionContext(ctx);
  335. childCtx.localVars._item = item;
  336. childCtx.localVars._index = index;
  337. childCtx.localVars._iterDir = `${step.id}_${index}`;
  338. try {
  339. for (const childID of children) {
  340. const childStep = engine.getStep(childID);
  341. if (childStep) await engine.executeStep(childCtx, childStep);
  342. }
  343. } catch (e) {
  344. if (e instanceof LoopBreakSignal) { breakTriggered = true; return; }
  345. throw e;
  346. }
  347. }
  348. };
  349. });
  350. await engine.parallelExecutor.execute(branches, engine.errorStrategy, ctx.signal);
  351. if (ctx._loopProgress) {
  352. ctx._loopProgress.set(step.id, maxIter);
  353. }
  354. }
  355. }
  356. /**
  357. * Execute one loop iteration. Returns true if BREAK was encountered.
  358. */
  359. async function executeLoopIteration(engine, ctx, step, children, item, index) {
  360. ctx.setVariable('_item', item);
  361. ctx.setVariable('_index', index);
  362. ctx.setVariable('_iterDir', `${step.id}_${index}`);
  363. let broke = false;
  364. try {
  365. for (const childID of children) {
  366. const childStep = engine.getStep(childID);
  367. if (childStep) await engine.executeStep(ctx, childStep);
  368. }
  369. } catch (e) {
  370. if (e instanceof LoopBreakSignal) { broke = true; }
  371. else throw e;
  372. } finally {
  373. ctx.setVariable('_item', undefined);
  374. ctx.setVariable('_index', undefined);
  375. ctx.setVariable('_iterDir', undefined);
  376. }
  377. return broke;
  378. }
  379. // ─── Download_* (v3.14+) ────────────────────────────────────
  380. async function executeDownloadStep(engine, ctx, step) {
  381. const ev = new ExpressionEvaluator(ctx);
  382. let url, headers = {}, timeout = 300000; // 5 min default
  383. if (typeof step.source === 'string') {
  384. url = ev.evaluateValue(step.source);
  385. } else if (step.source && typeof step.source === 'object') {
  386. url = ev.evaluateValue(step.source.url);
  387. if (step.source.headers) headers = ev.evaluateDeep(step.source.headers);
  388. if (step.source.timeout) timeout = step.source.timeout * 1000;
  389. }
  390. if (!url) throw new Error('Download source URL required');
  391. // Fetch
  392. const controller = new AbortController();
  393. const timer = setTimeout(() => controller.abort(), timeout);
  394. try {
  395. const resp = await fetch(url, { headers, signal: controller.signal });
  396. if (!resp.ok) throw new Error(`Download failed: ${resp.status} ${resp.statusText}`);
  397. const data = Buffer.from(await resp.arrayBuffer());
  398. // Determine target
  399. let targetPath = step.target ? ev.evaluateValue(step.target) : null;
  400. if (!targetPath && (step.routeByExt || step.defaultDir)) {
  401. const filename = extractFilename(url);
  402. targetPath = routePathByExt(filename, step.routeByExt, step.defaultDir);
  403. }
  404. if (!targetPath) {
  405. ctx.setVariable('_result', { size: data.length, url });
  406. await applyOutputMapping(engine, ctx, step);
  407. ctx.setVariable('_result', undefined);
  408. return;
  409. }
  410. targetPath = interpolateFilePath(ev, targetPath);
  411. targetPath = resolveTmpPath(ctx, targetPath);
  412. await ctx.fileAdapter.write(targetPath, data, WriteMode.Overwrite);
  413. ctx.artifacts[targetPath] = targetPath;
  414. emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: data.length });
  415. ctx.setVariable('_result', { path: targetPath });
  416. } finally {
  417. clearTimeout(timer);
  418. }
  419. await applyOutputMapping(engine, ctx, step);
  420. ctx.setVariable('_result', undefined);
  421. }
  422. // ─── Unzip_* (v3.14+) ───────────────────────────────────────
  423. async function executeUnzipStep(engine, ctx, step) {
  424. const ev = new ExpressionEvaluator(ctx);
  425. let sourcePath = typeof step.source === 'string' ? ev.evaluateValue(step.source) : String(step.source);
  426. sourcePath = interpolateFilePath(ev, sourcePath);
  427. sourcePath = resolveTmpPath(ctx, sourcePath);
  428. const zipData = await ctx.fileAdapter.read(sourcePath);
  429. // Use JSZip or similar — for now, require the adapter to handle it
  430. // We'll call fileAdapter.unzip if available, or throw
  431. if (!ctx.fileAdapter.unzip) throw new Error('File adapter does not support unzip');
  432. const overwrite = step.overwrite !== false;
  433. const mode = overwrite ? WriteMode.Overwrite : WriteMode.FailIfExists;
  434. const entries = await ctx.fileAdapter.unzip(zipData);
  435. let count = 0;
  436. const files = [];
  437. for (const entry of entries) {
  438. if (entry.isDirectory) continue;
  439. // Zip-slip safety
  440. if (entry.name.startsWith('/') || entry.name.includes('..')) continue;
  441. let targetPath = routePathByExt(entry.name, step.routeByExt, step.defaultDir);
  442. if (!targetPath) continue;
  443. targetPath = resolveTmpPath(ctx, targetPath);
  444. await ctx.fileAdapter.write(targetPath, entry.data, mode);
  445. ctx.artifacts[targetPath] = targetPath;
  446. emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: entry.data.length });
  447. files.push(targetPath);
  448. count++;
  449. }
  450. ctx.setVariable('_result', { count, files });
  451. await applyOutputMapping(engine, ctx, step);
  452. ctx.setVariable('_result', undefined);
  453. }
  454. // ─── Pause_* (v3.15+) ───────────────────────────────────────
  455. async function executePauseStep(engine, ctx, step) {
  456. const token = crypto.createHash('sha256')
  457. .update(`wt_${ctx.workflowID}_${step.id}_${Date.now()}`)
  458. .digest('hex');
  459. const pauseState = {
  460. token,
  461. nodeID: step.id,
  462. seenRequestIDs: new Set(),
  463. resumed: false,
  464. _resolve: null,
  465. _promise: null
  466. };
  467. // Create a promise that resume() will resolve
  468. pauseState._promise = new Promise(resolve => { pauseState._resolve = resolve; });
  469. ctx.pauseState = pauseState;
  470. ctx.status = ExecutionStatus.Paused;
  471. const timeoutSec = step.timeout ? (step.timeout.sec || step.timeout.seconds || 0) : 0;
  472. const expireAt = timeoutSec > 0 ? new Date(Date.now() + timeoutSec * 1000).toISOString() : null;
  473. emitEvent(ctx, RunEventType.PauseStart, step.id, {
  474. nodeId: step.id,
  475. waitToken: token,
  476. resumeResultTarget: step.resumeResultTarget || null,
  477. reason: step.reason || null,
  478. expireAt
  479. });
  480. // Wait for resume or timeout
  481. let result;
  482. if (timeoutSec > 0) {
  483. const timeoutMs = timeoutSec * 1000;
  484. result = await Promise.race([
  485. pauseState._promise,
  486. new Promise(resolve => setTimeout(() => resolve({ timeout: true }), timeoutMs))
  487. ]);
  488. } else {
  489. result = await pauseState._promise;
  490. }
  491. if (result?.timeout) {
  492. // Timeout
  493. emitEvent(ctx, RunEventType.PauseTimeout, step.id, {
  494. nodeId: step.id, expiredAt: new Date().toISOString(),
  495. timeoutAction: step.timeout.on || null
  496. });
  497. ctx.status = ExecutionStatus.Running;
  498. ctx.pauseState = null;
  499. if (step.timeout.on) {
  500. const timeoutStep = engine.getStep(step.timeout.on);
  501. if (timeoutStep) await engine.executeStep(ctx, timeoutStep);
  502. }
  503. return { handled: true }; // Pause handled its own next routing
  504. }
  505. // Resumed
  506. if (step.resumeResultTarget && result?.payload !== undefined) {
  507. const ev = new ExpressionEvaluator(ctx);
  508. ev.setVariable(step.resumeResultTarget, result.payload);
  509. }
  510. emitEvent(ctx, RunEventType.PauseResumed, step.id, {
  511. nodeId: step.id, requestId: result?.requestId || null,
  512. resumedAt: new Date().toISOString()
  513. });
  514. ctx.status = ExecutionStatus.Running;
  515. ctx.pauseState = null;
  516. // Pause_* handles its own step_done and next
  517. emitEvent(ctx, RunEventType.StepDone, step.id, {
  518. outputs: collectPauseOutputs(ctx, step)
  519. });
  520. if (step.next === 'BREAK') throw new LoopBreakSignal();
  521. if (step.next && step.next !== 'RETURN' && step.next !== 'STOP') {
  522. const nextStep = engine.getStep(step.next);
  523. if (nextStep) await engine.executeStep(ctx, nextStep);
  524. }
  525. return { handled: true };
  526. }
  527. // ─── Output Mapping ──────────────────────────────────────────
  528. async function applyOutputMapping(engine, ctx, step) {
  529. if (!step.out) return;
  530. const ev = new ExpressionEvaluator(ctx);
  531. // Shorthand: out is a string like "$plan" → means { "$plan": "=_result" }
  532. if (typeof step.out === 'string') {
  533. ev.setVariable(step.out, ctx.getVariable('_result'));
  534. return;
  535. }
  536. for (const [target, valueExpr] of Object.entries(step.out)) {
  537. const value = ev.evaluateValue(valueExpr);
  538. if (target.startsWith('/')) {
  539. // File write
  540. const filePath = interpolateFilePath(ev, target);
  541. // Guard: reject paths that resolved to empty or bare "/" (missing filePath interpolation)
  542. if (!filePath || filePath === '/') {
  543. emitEvent(ctx, RunEventType.StepError, step.id, {
  544. error: `File write skipped: path template "${target}" resolved to empty path — missing filePath in metadata`
  545. });
  546. continue;
  547. }
  548. const content = toBytes(value);
  549. await ctx.fileAdapter.write(filePath, content, WriteMode.Overwrite);
  550. ctx.artifacts[filePath] = filePath;
  551. emitEvent(ctx, RunEventType.FileDone, step.id, { path: filePath, size_bytes: content.length });
  552. } else if (target.startsWith('$') || target.startsWith('_')) {
  553. ev.setVariable(target, value);
  554. } else {
  555. throw new Error(`Invalid output target: ${target}`);
  556. }
  557. }
  558. }
  559. // ─── Helpers ─────────────────────────────────────────────────
  560. function interpolateFilePath(ev, path) {
  561. let result = '';
  562. let i = 0;
  563. while (i < path.length) {
  564. if (path[i] === '{') {
  565. const end = path.indexOf('}', i);
  566. if (end < 0) { result += path.slice(i); break; }
  567. const expr = path.slice(i + 1, end);
  568. const val = ev.evaluate(expr);
  569. result += val != null ? String(val) : '';
  570. i = end + 1;
  571. } else {
  572. result += path[i];
  573. i++;
  574. }
  575. }
  576. return result;
  577. }
  578. function resolveTmpPath(ctx, path) {
  579. if (!path.startsWith('.tmp/')) return path;
  580. return `.tmp/${ctx.workflowID}/${path.slice(5)}`;
  581. }
  582. function toBytes(value) {
  583. if (typeof value === 'string') return Buffer.from(value, 'utf8');
  584. if (Buffer.isBuffer(value)) return value;
  585. return Buffer.from(JSON.stringify(value), 'utf8');
  586. }
  587. function extractFilename(url) {
  588. try {
  589. const u = new URL(url);
  590. const parts = u.pathname.split('/');
  591. return parts[parts.length - 1] || 'download';
  592. } catch { return 'download'; }
  593. }
  594. function routePathByExt(filename, routeByExt, defaultDir) {
  595. if (!routeByExt && !defaultDir) return filename;
  596. const ext = filename.includes('.') ? filename.slice(filename.lastIndexOf('.')) : '';
  597. if (routeByExt && ext && routeByExt[ext]) {
  598. return routeByExt[ext] + '/' + filename;
  599. }
  600. if (defaultDir) return defaultDir + '/' + filename;
  601. return null; // Skip
  602. }
  603. function collectPauseOutputs(ctx, step) {
  604. if (!step.resumeResultTarget) return null;
  605. const val = ctx.getVariable(step.resumeResultTarget);
  606. return { [step.resumeResultTarget]: val };
  607. }
  608. function emitEvent(ctx, type, stepId, payload) {
  609. ctx.emitEvent({ type, stepID: stepId || null, payload: payload || {} });
  610. }
  611. module.exports = {
  612. executeServiceStep, executeComponentStep, executeLLMStep,
  613. executeAPIStep, executeSetStep, executeWriteStep,
  614. executeBranchStep, executeLoopStep,
  615. executeDownloadStep, executeUnzipStep, executePauseStep,
  616. applyOutputMapping, emitEvent, interpolateFilePath, resolveTmpPath,
  617. LoopBreakSignal
  618. };