/** * VL Workflow Engine — Step Executor * Ported from Go: workflow/executor.go * Spec: v3.16 * * Handles all 12 step types: Service, API, Component, LLM, Set, Write, * Download, Unzip, Pause, Branch, Loop, Stop (+ Noop) * * v3.16 additions: * - Loop `while` mode (condition loop, mutually exclusive with `source`) * - `BREAK` keyword in Loop children (exits entire loop) * - LLM `model` field: `/` / `` / omitted */ const crypto = require('crypto'); const { ExpressionEvaluator, toBool } = require('./expression'); const { RunEventType, WriteMode, ExecutionStatus, isV310OrLater, buildErrorMap } = require('./types'); // ─── Service_* ─────────────────────────────────────────────── async function executeServiceStep(engine, ctx, step) { const serviceName = step.id.replace(/^Service_/, ''); const ev = new ExpressionEvaluator(ctx); const params = ev.evaluateDeep(step.in || {}); const result = await ctx.serviceAdapter.call(serviceName, params); ctx.setVariable('_result', result?.data || result); await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); } // ─── Component_* ───────────────────────────────────────────── async function executeComponentStep(engine, ctx, step) { const componentID = step.id.replace(/^Component_/, ''); const ev = new ExpressionEvaluator(ctx); const params = ev.evaluateDeep(step.in || {}); const result = await ctx.componentAdapter.call(componentID, params); ctx.setVariable('_result', result); await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); } // ─── LLM_* ─────────────────────────────────────────────────── async function executeLLMStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); const params = ev.evaluateDeep(step.in || {}); // v3.16 — LLM model field: /, only, or omitted if (step.model) { const parts = step.model.split('/'); if (parts.length === 2) { params._provider = parts[0]; params.model = parts[1]; } else if (parts.length === 1) { params._provider = parts[0]; // modelId omitted — adapter picks default for provider } } // Inject docs into system message await injectDocs(ctx, params); // Resolve schema references resolveSchemaRef(engine, params); const isStreaming = params.stream === true; let streamTokens = []; // Extended streaming callbacks — separate thinking, response, tool_use, tool_result const callbacks = isStreaming ? { onToken: (delta) => { streamTokens.push(delta); emitEvent(ctx, RunEventType.LLMToken, step.id, { delta }); }, onThinking: (delta) => { emitEvent(ctx, RunEventType.LLMThinking, step.id, { delta }); }, onToolUse: (toolUse) => { emitEvent(ctx, RunEventType.LLMToolUse, step.id, { tool_use_id: toolUse.id, name: toolUse.name, input: toolUse.input }); }, onToolResult: (toolResult) => { emitEvent(ctx, RunEventType.LLMToolResult, step.id, { tool_use_id: toolResult.tool_use_id, content: toolResult.content, is_error: toolResult.is_error || false }); } } : null; // Backward-compatible: pass both onToken (legacy) and callbacks (extended) const onToken = callbacks?.onToken || null; const callStart = Date.now(); let result, error; try { result = await ctx.llmAdapter.call(params, onToken, callbacks); } catch (e) { error = e; } const latencyMs = Date.now() - callStart; if (error) { emitEvent(ctx, RunEventType.LLMError, step.id, { error: error.message, type: error.type || 'unknown', code: error.code || '', retryable: error.retryable || false, latency_ms: latencyMs }); if (isV310OrLater(ctx.version)) { ctx.setVariable('_meta', buildMetaFromError(error, latencyMs)); } throw error; } // Emit llm_done const usage = result.usage || {}; emitEvent(ctx, RunEventType.LLMDone, step.id, { latency_ms: latencyMs, finish_reason: result.finish_reason || result.stop_reason || 'stop', model: result.model || '', usage: { input_tokens: usage.input_tokens || usage.prompt_tokens || 0, output_tokens: usage.output_tokens || usage.completion_tokens || 0, total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0) }, // Include thinking summary if available thinking_tokens: usage.thinking_tokens || usage.cache_creation_input_tokens || 0, has_tool_use: !!(result.tool_calls || result.tool_use || []).length }); // Apply result if (isV310OrLater(ctx.version)) { // v3.10+: _result = content only, _meta = metadata let content = result.content || ''; // Unwrap json_object / json_schema structured output const formatType = params.output_config?.format?.type || params.output_config?.type; if (typeof content === 'string' && (formatType === 'json_object' || formatType === 'json_schema')) { try { content = JSON.parse(content); } catch {} } ctx.setVariable('_result', content); ctx.setVariable('_meta', buildMeta(result, latencyMs)); } else { // v3.6–v3.9: _result = full result or just content for structured ctx.setVariable('_result', result); } await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); if (isV310OrLater(ctx.version)) ctx.setVariable('_meta', undefined); } function buildMeta(result, latencyMs) { const model = result.model || ''; let provider = 'unknown'; if (model.includes('claude')) provider = 'anthropic'; else if (model.includes('gpt') || model.includes('o1') || model.includes('o3') || model.includes('o4')) provider = 'openai'; const usage = result.usage || {}; return { latency_ms: latencyMs, model, model_resolved: model, provider, finish_reason: result.finish_reason || result.stop_reason || 'stop', response_id: result.id || result.response_id || undefined, usage: { raw: usage, input_tokens: usage.input_tokens || usage.prompt_tokens || 0, output_tokens: usage.output_tokens || usage.completion_tokens || 0, total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0) } }; } function buildMetaFromError(err, latencyMs) { return { latency_ms: latencyMs, error: buildErrorMap(err) }; } async function injectDocs(ctx, params) { if (!params.docs || !ctx.docAdapter) return; const docIds = Array.isArray(params.docs) ? params.docs : [params.docs]; const parts = []; for (const id of docIds) { try { const content = await ctx.docAdapter.get(String(id)); if (content) parts.push(content); } catch {} } if (parts.length > 0) { const docsBlock = parts.join('\n\n---\n\n'); if (params.system) params.system = params.system + '\n\n' + docsBlock; else params.system = docsBlock; } delete params.docs; } function resolveSchemaRef(engine, params) { if (!params.output_config?.schemaRef || !engine.registry) return; const ref = params.output_config.schemaRef; const schema = engine.registry.getSchema(ref); if (schema) { params.output_config.schema = schema; delete params.output_config.schemaRef; } } // ─── API_* ─────────────────────────────────────────────────── async function executeAPIStep(engine, ctx, step) { const apiID = step.id.replace(/^API_/, ''); const apiDef = engine.registry.getAPIDefinition(apiID); if (!apiDef) throw new Error(`API definition not found: ${apiID}`); const ev = new ExpressionEvaluator(ctx); const params = ev.evaluateDeep(step.in || {}); // Resolve auth token if (apiDef.auth) { const authVal = apiDef.auth.startsWith('=') ? ev.evaluateValue(apiDef.auth) : ev.evaluate(apiDef.auth); if (authVal != null) params.authToken = authVal; } const result = await ctx.apiAdapter.call(apiDef, params); ctx.setVariable('_result', result); await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); } // ─── Set_* ─────────────────────────────────────────────────── function executeSetStep(engine, ctx, step) { // Set steps may only have `out` (file writes handled by applyOutputMapping) if (!step.value && !step.target) return; const ev = new ExpressionEvaluator(ctx); const value = ev.evaluateValue(step.value); ev.setVariable(step.target, value); } // ─── Write_* ───────────────────────────────────────────────── async function executeWriteStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); let targetPath = ev.evaluateValue(step.target); if (typeof targetPath !== 'string') throw new Error(`Write target must be string, got ${typeof targetPath}`); targetPath = interpolateFilePath(ev, targetPath); targetPath = resolveTmpPath(ctx, targetPath); const value = ev.evaluateValue(step.value); const content = toBytes(value); const mode = step.mode || WriteMode.Overwrite; await ctx.fileAdapter.write(targetPath, content, mode); ctx.artifacts[targetPath] = targetPath; emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: content.length }); } // ─── Branch_* / Check_* ────────────────────────────────────── function executeBranchStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); // Check_* style: condition / if_true / if_false if (step.condition !== undefined) { const expr = step.condition; const val = (typeof expr === 'string' && expr.startsWith('=')) ? ev.evaluateValue(expr) : ev.evaluate(expr); return toBool(val) ? (step.if_true || null) : (step.if_false || null); } // Standard Branch_* style: cases array for (const cas of (step.cases || [])) { // Support both formats: [expr, stepId] and {condition, next} let expr, stepId; if (Array.isArray(cas)) { if (cas.length < 2) continue; [expr, stepId] = cas; } else if (cas && typeof cas === 'object') { expr = cas.condition; stepId = cas.next || cas.step; if (!expr || !stepId) continue; } else continue; if (expr === 'ELSE') { return stepId; } const val = (typeof expr === 'string' && expr.startsWith('=')) ? ev.evaluateValue(expr) : ev.evaluate(expr); if (toBool(val)) return stepId; } // Default fallback (for object-style cases) return step.default || null; } // ─── Loop_* ────────────────────────────────────────────────── // Sentinel thrown inside loop children when `next: "BREAK"` is encountered. class LoopBreakSignal { constructor() { this.type = 'LOOP_BREAK'; } } async function executeLoopStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); const mode = step.mode || 'serial'; const children = step.children || []; const isWhileMode = step.while != null; // ─── While mode (v3.16) ───────────────────────────── if (isWhileMode) { const maxIter = step.maxIterations || Infinity; let i = ctx._loopProgress?.get(step.id) || 0; while (i < maxIter) { if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break; // Evaluate while condition before each iteration const condVal = ev.evaluateValue(step.while); if (!toBool(condVal)) break; // Set loop locals (_item is not available in while mode, _index is) ctx.setVariable('_index', i); ctx.setVariable('_iterDir', `${step.id}_${i}`); let broke = false; try { for (const childID of children) { const childStep = engine.getStep(childID); if (childStep) await engine.executeStep(ctx, childStep); } } catch (e) { if (e instanceof LoopBreakSignal) { broke = true; } else throw e; } finally { ctx.setVariable('_index', undefined); ctx.setVariable('_iterDir', undefined); } i++; if (ctx._loopProgress) { ctx._loopProgress.set(step.id, i); if (engine._emitCheckpoint) engine._emitCheckpoint(ctx); } if (broke) break; } return; // while mode always serial, done } // ─── Source (array) mode ──────────────────────────── let sourceExpr = step.source; if (typeof sourceExpr === 'string' && !sourceExpr.startsWith('=')) sourceExpr = '=' + sourceExpr; const rawSource = ev.evaluateValue(sourceExpr); const source = Array.isArray(rawSource) ? rawSource : []; if (!Array.isArray(rawSource)) { emitEvent(ctx, RunEventType.StepSkipped, step.id, { reason: `Loop source is ${rawSource === undefined ? 'undefined' : typeof rawSource} (expected array) — skipping loop`, expression: sourceExpr, }); } // Apply maxIterations cap if present (v3.16) const maxIter = step.maxIterations ? Math.min(source.length, step.maxIterations) : source.length; const startIndex = ctx._loopProgress?.get(step.id) || 0; if (mode === 'serial') { for (let i = startIndex; i < maxIter; i++) { if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break; const broke = await executeLoopIteration(engine, ctx, step, children, source[i], i); if (ctx._loopProgress) { ctx._loopProgress.set(step.id, i + 1); if (engine._emitCheckpoint) engine._emitCheckpoint(ctx); } if (broke) break; // BREAK encountered } } else { // Parallel — BREAK support: flag to stop launching new iterations const { ChildExecutionContext } = require('./types'); let breakTriggered = false; const branches = source.slice(startIndex, maxIter).map((item, offset) => { const index = startIndex + offset; return { id: `${step.id}[${index}]`, fn: async () => { if (breakTriggered) return; // Don't start new iterations after BREAK const childCtx = new ChildExecutionContext(ctx); childCtx.localVars._item = item; childCtx.localVars._index = index; childCtx.localVars._iterDir = `${step.id}_${index}`; try { for (const childID of children) { const childStep = engine.getStep(childID); if (childStep) await engine.executeStep(childCtx, childStep); } } catch (e) { if (e instanceof LoopBreakSignal) { breakTriggered = true; return; } throw e; } } }; }); await engine.parallelExecutor.execute(branches, engine.errorStrategy, ctx.signal); if (ctx._loopProgress) { ctx._loopProgress.set(step.id, maxIter); } } } /** * Execute one loop iteration. Returns true if BREAK was encountered. */ async function executeLoopIteration(engine, ctx, step, children, item, index) { ctx.setVariable('_item', item); ctx.setVariable('_index', index); ctx.setVariable('_iterDir', `${step.id}_${index}`); let broke = false; try { for (const childID of children) { const childStep = engine.getStep(childID); if (childStep) await engine.executeStep(ctx, childStep); } } catch (e) { if (e instanceof LoopBreakSignal) { broke = true; } else throw e; } finally { ctx.setVariable('_item', undefined); ctx.setVariable('_index', undefined); ctx.setVariable('_iterDir', undefined); } return broke; } // ─── Download_* (v3.14+) ──────────────────────────────────── async function executeDownloadStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); let url, headers = {}, timeout = 300000; // 5 min default if (typeof step.source === 'string') { url = ev.evaluateValue(step.source); } else if (step.source && typeof step.source === 'object') { url = ev.evaluateValue(step.source.url); if (step.source.headers) headers = ev.evaluateDeep(step.source.headers); if (step.source.timeout) timeout = step.source.timeout * 1000; } if (!url) throw new Error('Download source URL required'); // Fetch const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), timeout); try { const resp = await fetch(url, { headers, signal: controller.signal }); if (!resp.ok) throw new Error(`Download failed: ${resp.status} ${resp.statusText}`); const data = Buffer.from(await resp.arrayBuffer()); // Determine target let targetPath = step.target ? ev.evaluateValue(step.target) : null; if (!targetPath && (step.routeByExt || step.defaultDir)) { const filename = extractFilename(url); targetPath = routePathByExt(filename, step.routeByExt, step.defaultDir); } if (!targetPath) { ctx.setVariable('_result', { size: data.length, url }); await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); return; } targetPath = interpolateFilePath(ev, targetPath); targetPath = resolveTmpPath(ctx, targetPath); await ctx.fileAdapter.write(targetPath, data, WriteMode.Overwrite); ctx.artifacts[targetPath] = targetPath; emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: data.length }); ctx.setVariable('_result', { path: targetPath }); } finally { clearTimeout(timer); } await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); } // ─── Unzip_* (v3.14+) ─────────────────────────────────────── async function executeUnzipStep(engine, ctx, step) { const ev = new ExpressionEvaluator(ctx); let sourcePath = typeof step.source === 'string' ? ev.evaluateValue(step.source) : String(step.source); sourcePath = interpolateFilePath(ev, sourcePath); sourcePath = resolveTmpPath(ctx, sourcePath); const zipData = await ctx.fileAdapter.read(sourcePath); // Use JSZip or similar — for now, require the adapter to handle it // We'll call fileAdapter.unzip if available, or throw if (!ctx.fileAdapter.unzip) throw new Error('File adapter does not support unzip'); const overwrite = step.overwrite !== false; const mode = overwrite ? WriteMode.Overwrite : WriteMode.FailIfExists; const entries = await ctx.fileAdapter.unzip(zipData); let count = 0; const files = []; for (const entry of entries) { if (entry.isDirectory) continue; // Zip-slip safety if (entry.name.startsWith('/') || entry.name.includes('..')) continue; let targetPath = routePathByExt(entry.name, step.routeByExt, step.defaultDir); if (!targetPath) continue; targetPath = resolveTmpPath(ctx, targetPath); await ctx.fileAdapter.write(targetPath, entry.data, mode); ctx.artifacts[targetPath] = targetPath; emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: entry.data.length }); files.push(targetPath); count++; } ctx.setVariable('_result', { count, files }); await applyOutputMapping(engine, ctx, step); ctx.setVariable('_result', undefined); } // ─── Pause_* (v3.15+) ─────────────────────────────────────── async function executePauseStep(engine, ctx, step) { const token = crypto.createHash('sha256') .update(`wt_${ctx.workflowID}_${step.id}_${Date.now()}`) .digest('hex'); const pauseState = { token, nodeID: step.id, seenRequestIDs: new Set(), resumed: false, _resolve: null, _promise: null }; // Create a promise that resume() will resolve pauseState._promise = new Promise(resolve => { pauseState._resolve = resolve; }); ctx.pauseState = pauseState; ctx.status = ExecutionStatus.Paused; const timeoutSec = step.timeout ? (step.timeout.sec || step.timeout.seconds || 0) : 0; const expireAt = timeoutSec > 0 ? new Date(Date.now() + timeoutSec * 1000).toISOString() : null; emitEvent(ctx, RunEventType.PauseStart, step.id, { nodeId: step.id, waitToken: token, resumeResultTarget: step.resumeResultTarget || null, reason: step.reason || null, expireAt }); // Wait for resume or timeout let result; if (timeoutSec > 0) { const timeoutMs = timeoutSec * 1000; result = await Promise.race([ pauseState._promise, new Promise(resolve => setTimeout(() => resolve({ timeout: true }), timeoutMs)) ]); } else { result = await pauseState._promise; } if (result?.timeout) { // Timeout emitEvent(ctx, RunEventType.PauseTimeout, step.id, { nodeId: step.id, expiredAt: new Date().toISOString(), timeoutAction: step.timeout.on || null }); ctx.status = ExecutionStatus.Running; ctx.pauseState = null; if (step.timeout.on) { const timeoutStep = engine.getStep(step.timeout.on); if (timeoutStep) await engine.executeStep(ctx, timeoutStep); } return { handled: true }; // Pause handled its own next routing } // Resumed if (step.resumeResultTarget && result?.payload !== undefined) { const ev = new ExpressionEvaluator(ctx); ev.setVariable(step.resumeResultTarget, result.payload); } emitEvent(ctx, RunEventType.PauseResumed, step.id, { nodeId: step.id, requestId: result?.requestId || null, resumedAt: new Date().toISOString() }); ctx.status = ExecutionStatus.Running; ctx.pauseState = null; // Pause_* handles its own step_done and next emitEvent(ctx, RunEventType.StepDone, step.id, { outputs: collectPauseOutputs(ctx, step) }); if (step.next === 'BREAK') throw new LoopBreakSignal(); if (step.next && step.next !== 'RETURN' && step.next !== 'STOP') { const nextStep = engine.getStep(step.next); if (nextStep) await engine.executeStep(ctx, nextStep); } return { handled: true }; } // ─── Output Mapping ────────────────────────────────────────── async function applyOutputMapping(engine, ctx, step) { if (!step.out) return; const ev = new ExpressionEvaluator(ctx); // Shorthand: out is a string like "$plan" → means { "$plan": "=_result" } if (typeof step.out === 'string') { ev.setVariable(step.out, ctx.getVariable('_result')); return; } for (const [target, valueExpr] of Object.entries(step.out)) { const value = ev.evaluateValue(valueExpr); if (target.startsWith('/')) { // File write const filePath = interpolateFilePath(ev, target); // Guard: reject paths that resolved to empty or bare "/" (missing filePath interpolation) if (!filePath || filePath === '/') { emitEvent(ctx, RunEventType.StepError, step.id, { error: `File write skipped: path template "${target}" resolved to empty path — missing filePath in metadata` }); continue; } const content = toBytes(value); await ctx.fileAdapter.write(filePath, content, WriteMode.Overwrite); ctx.artifacts[filePath] = filePath; emitEvent(ctx, RunEventType.FileDone, step.id, { path: filePath, size_bytes: content.length }); } else if (target.startsWith('$') || target.startsWith('_')) { ev.setVariable(target, value); } else { throw new Error(`Invalid output target: ${target}`); } } } // ─── Helpers ───────────────────────────────────────────────── function interpolateFilePath(ev, path) { let result = ''; let i = 0; while (i < path.length) { if (path[i] === '{') { const end = path.indexOf('}', i); if (end < 0) { result += path.slice(i); break; } const expr = path.slice(i + 1, end); const val = ev.evaluate(expr); result += val != null ? String(val) : ''; i = end + 1; } else { result += path[i]; i++; } } return result; } function resolveTmpPath(ctx, path) { if (!path.startsWith('.tmp/')) return path; return `.tmp/${ctx.workflowID}/${path.slice(5)}`; } function toBytes(value) { if (typeof value === 'string') return Buffer.from(value, 'utf8'); if (Buffer.isBuffer(value)) return value; return Buffer.from(JSON.stringify(value), 'utf8'); } function extractFilename(url) { try { const u = new URL(url); const parts = u.pathname.split('/'); return parts[parts.length - 1] || 'download'; } catch { return 'download'; } } function routePathByExt(filename, routeByExt, defaultDir) { if (!routeByExt && !defaultDir) return filename; const ext = filename.includes('.') ? filename.slice(filename.lastIndexOf('.')) : ''; if (routeByExt && ext && routeByExt[ext]) { return routeByExt[ext] + '/' + filename; } if (defaultDir) return defaultDir + '/' + filename; return null; // Skip } function collectPauseOutputs(ctx, step) { if (!step.resumeResultTarget) return null; const val = ctx.getVariable(step.resumeResultTarget); return { [step.resumeResultTarget]: val }; } function emitEvent(ctx, type, stepId, payload) { ctx.emitEvent({ type, stepID: stepId || null, payload: payload || {} }); } module.exports = { executeServiceStep, executeComponentStep, executeLLMStep, executeAPIStep, executeSetStep, executeWriteStep, executeBranchStep, executeLoopStep, executeDownloadStep, executeUnzipStep, executePauseStep, applyOutputMapping, emitEvent, interpolateFilePath, resolveTmpPath, LoopBreakSignal };