| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690 |
- /**
- * VL Workflow Engine — Step Executor
- * Ported from Go: workflow/executor.go
- * Spec: v3.16
- *
- * Handles all 12 step types: Service, API, Component, LLM, Set, Write,
- * Download, Unzip, Pause, Branch, Loop, Stop (+ Noop)
- *
- * v3.16 additions:
- * - Loop `while` mode (condition loop, mutually exclusive with `source`)
- * - `BREAK` keyword in Loop children (exits entire loop)
- * - LLM `model` field: `<provider>/<modelId>` / `<provider>` / omitted
- */
- const crypto = require('crypto');
- const { ExpressionEvaluator, toBool } = require('./expression');
- const { RunEventType, WriteMode, ExecutionStatus, isV310OrLater, buildErrorMap } = require('./types');
- // ─── Service_* ───────────────────────────────────────────────
- async function executeServiceStep(engine, ctx, step) {
- const serviceName = step.id.replace(/^Service_/, '');
- const ev = new ExpressionEvaluator(ctx);
- const params = ev.evaluateDeep(step.in || {});
- const result = await ctx.serviceAdapter.call(serviceName, params);
- ctx.setVariable('_result', result?.data || result);
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- }
- // ─── Component_* ─────────────────────────────────────────────
- async function executeComponentStep(engine, ctx, step) {
- const componentID = step.id.replace(/^Component_/, '');
- const ev = new ExpressionEvaluator(ctx);
- const params = ev.evaluateDeep(step.in || {});
- const result = await ctx.componentAdapter.call(componentID, params);
- ctx.setVariable('_result', result);
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- }
- // ─── LLM_* ───────────────────────────────────────────────────
- async function executeLLMStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- const params = ev.evaluateDeep(step.in || {});
- // v3.16 — LLM model field: <provider>/<modelId>, <provider> only, or omitted
- if (step.model) {
- const parts = step.model.split('/');
- if (parts.length === 2) {
- params._provider = parts[0];
- params.model = parts[1];
- } else if (parts.length === 1) {
- params._provider = parts[0];
- // modelId omitted — adapter picks default for provider
- }
- }
- // Inject docs into system message
- await injectDocs(ctx, params);
- // Resolve schema references
- resolveSchemaRef(engine, params);
- const isStreaming = params.stream === true;
- let streamTokens = [];
- // Extended streaming callbacks — separate thinking, response, tool_use, tool_result
- const callbacks = isStreaming ? {
- onToken: (delta) => {
- streamTokens.push(delta);
- emitEvent(ctx, RunEventType.LLMToken, step.id, { delta });
- },
- onThinking: (delta) => {
- emitEvent(ctx, RunEventType.LLMThinking, step.id, { delta });
- },
- onToolUse: (toolUse) => {
- emitEvent(ctx, RunEventType.LLMToolUse, step.id, {
- tool_use_id: toolUse.id,
- name: toolUse.name,
- input: toolUse.input
- });
- },
- onToolResult: (toolResult) => {
- emitEvent(ctx, RunEventType.LLMToolResult, step.id, {
- tool_use_id: toolResult.tool_use_id,
- content: toolResult.content,
- is_error: toolResult.is_error || false
- });
- }
- } : null;
- // Backward-compatible: pass both onToken (legacy) and callbacks (extended)
- const onToken = callbacks?.onToken || null;
- const callStart = Date.now();
- let result, error;
- try {
- result = await ctx.llmAdapter.call(params, onToken, callbacks);
- } catch (e) {
- error = e;
- }
- const latencyMs = Date.now() - callStart;
- if (error) {
- emitEvent(ctx, RunEventType.LLMError, step.id, {
- error: error.message,
- type: error.type || 'unknown',
- code: error.code || '',
- retryable: error.retryable || false,
- latency_ms: latencyMs
- });
- if (isV310OrLater(ctx.version)) {
- ctx.setVariable('_meta', buildMetaFromError(error, latencyMs));
- }
- throw error;
- }
- // Emit llm_done
- const usage = result.usage || {};
- emitEvent(ctx, RunEventType.LLMDone, step.id, {
- latency_ms: latencyMs,
- finish_reason: result.finish_reason || result.stop_reason || 'stop',
- model: result.model || '',
- usage: {
- input_tokens: usage.input_tokens || usage.prompt_tokens || 0,
- output_tokens: usage.output_tokens || usage.completion_tokens || 0,
- total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0)
- },
- // Include thinking summary if available
- thinking_tokens: usage.thinking_tokens || usage.cache_creation_input_tokens || 0,
- has_tool_use: !!(result.tool_calls || result.tool_use || []).length
- });
- // Apply result
- if (isV310OrLater(ctx.version)) {
- // v3.10+: _result = content only, _meta = metadata
- let content = result.content || '';
- // Unwrap json_object / json_schema structured output
- const formatType = params.output_config?.format?.type || params.output_config?.type;
- if (typeof content === 'string' && (formatType === 'json_object' || formatType === 'json_schema')) {
- try { content = JSON.parse(content); } catch {}
- }
- ctx.setVariable('_result', content);
- ctx.setVariable('_meta', buildMeta(result, latencyMs));
- } else {
- // v3.6–v3.9: _result = full result or just content for structured
- ctx.setVariable('_result', result);
- }
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- if (isV310OrLater(ctx.version)) ctx.setVariable('_meta', undefined);
- }
- function buildMeta(result, latencyMs) {
- const model = result.model || '';
- let provider = 'unknown';
- if (model.includes('claude')) provider = 'anthropic';
- else if (model.includes('gpt') || model.includes('o1') || model.includes('o3') || model.includes('o4')) provider = 'openai';
- const usage = result.usage || {};
- return {
- latency_ms: latencyMs,
- model, model_resolved: model, provider,
- finish_reason: result.finish_reason || result.stop_reason || 'stop',
- response_id: result.id || result.response_id || undefined,
- usage: {
- raw: usage,
- input_tokens: usage.input_tokens || usage.prompt_tokens || 0,
- output_tokens: usage.output_tokens || usage.completion_tokens || 0,
- total_tokens: (usage.input_tokens || usage.prompt_tokens || 0) + (usage.output_tokens || usage.completion_tokens || 0)
- }
- };
- }
- function buildMetaFromError(err, latencyMs) {
- return { latency_ms: latencyMs, error: buildErrorMap(err) };
- }
- async function injectDocs(ctx, params) {
- if (!params.docs || !ctx.docAdapter) return;
- const docIds = Array.isArray(params.docs) ? params.docs : [params.docs];
- const parts = [];
- for (const id of docIds) {
- try {
- const content = await ctx.docAdapter.get(String(id));
- if (content) parts.push(content);
- } catch {}
- }
- if (parts.length > 0) {
- const docsBlock = parts.join('\n\n---\n\n');
- if (params.system) params.system = params.system + '\n\n' + docsBlock;
- else params.system = docsBlock;
- }
- delete params.docs;
- }
- function resolveSchemaRef(engine, params) {
- if (!params.output_config?.schemaRef || !engine.registry) return;
- const ref = params.output_config.schemaRef;
- const schema = engine.registry.getSchema(ref);
- if (schema) {
- params.output_config.schema = schema;
- delete params.output_config.schemaRef;
- }
- }
- // ─── API_* ───────────────────────────────────────────────────
- async function executeAPIStep(engine, ctx, step) {
- const apiID = step.id.replace(/^API_/, '');
- const apiDef = engine.registry.getAPIDefinition(apiID);
- if (!apiDef) throw new Error(`API definition not found: ${apiID}`);
- const ev = new ExpressionEvaluator(ctx);
- const params = ev.evaluateDeep(step.in || {});
- // Resolve auth token
- if (apiDef.auth) {
- const authVal = apiDef.auth.startsWith('=') ? ev.evaluateValue(apiDef.auth) : ev.evaluate(apiDef.auth);
- if (authVal != null) params.authToken = authVal;
- }
- const result = await ctx.apiAdapter.call(apiDef, params);
- ctx.setVariable('_result', result);
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- }
- // ─── Set_* ───────────────────────────────────────────────────
- function executeSetStep(engine, ctx, step) {
- // Set steps may only have `out` (file writes handled by applyOutputMapping)
- if (!step.value && !step.target) return;
- const ev = new ExpressionEvaluator(ctx);
- const value = ev.evaluateValue(step.value);
- ev.setVariable(step.target, value);
- }
- // ─── Write_* ─────────────────────────────────────────────────
- async function executeWriteStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- let targetPath = ev.evaluateValue(step.target);
- if (typeof targetPath !== 'string') throw new Error(`Write target must be string, got ${typeof targetPath}`);
- targetPath = interpolateFilePath(ev, targetPath);
- targetPath = resolveTmpPath(ctx, targetPath);
- const value = ev.evaluateValue(step.value);
- const content = toBytes(value);
- const mode = step.mode || WriteMode.Overwrite;
- await ctx.fileAdapter.write(targetPath, content, mode);
- ctx.artifacts[targetPath] = targetPath;
- emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: content.length });
- }
- // ─── Branch_* / Check_* ──────────────────────────────────────
- function executeBranchStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- // Check_* style: condition / if_true / if_false
- if (step.condition !== undefined) {
- const expr = step.condition;
- const val = (typeof expr === 'string' && expr.startsWith('='))
- ? ev.evaluateValue(expr) : ev.evaluate(expr);
- return toBool(val) ? (step.if_true || null) : (step.if_false || null);
- }
- // Standard Branch_* style: cases array
- for (const cas of (step.cases || [])) {
- // Support both formats: [expr, stepId] and {condition, next}
- let expr, stepId;
- if (Array.isArray(cas)) {
- if (cas.length < 2) continue;
- [expr, stepId] = cas;
- } else if (cas && typeof cas === 'object') {
- expr = cas.condition;
- stepId = cas.next || cas.step;
- if (!expr || !stepId) continue;
- } else continue;
- if (expr === 'ELSE') {
- return stepId;
- }
- const val = (typeof expr === 'string' && expr.startsWith('='))
- ? ev.evaluateValue(expr) : ev.evaluate(expr);
- if (toBool(val)) return stepId;
- }
- // Default fallback (for object-style cases)
- return step.default || null;
- }
- // ─── Loop_* ──────────────────────────────────────────────────
- // Sentinel thrown inside loop children when `next: "BREAK"` is encountered.
- class LoopBreakSignal { constructor() { this.type = 'LOOP_BREAK'; } }
- async function executeLoopStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- const mode = step.mode || 'serial';
- const children = step.children || [];
- const isWhileMode = step.while != null;
- // ─── While mode (v3.16) ─────────────────────────────
- if (isWhileMode) {
- const maxIter = step.maxIterations || Infinity;
- let i = ctx._loopProgress?.get(step.id) || 0;
- while (i < maxIter) {
- if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break;
- // Evaluate while condition before each iteration
- const condVal = ev.evaluateValue(step.while);
- if (!toBool(condVal)) break;
- // Set loop locals (_item is not available in while mode, _index is)
- ctx.setVariable('_index', i);
- ctx.setVariable('_iterDir', `${step.id}_${i}`);
- let broke = false;
- try {
- for (const childID of children) {
- const childStep = engine.getStep(childID);
- if (childStep) await engine.executeStep(ctx, childStep);
- }
- } catch (e) {
- if (e instanceof LoopBreakSignal) { broke = true; }
- else throw e;
- } finally {
- ctx.setVariable('_index', undefined);
- ctx.setVariable('_iterDir', undefined);
- }
- i++;
- if (ctx._loopProgress) {
- ctx._loopProgress.set(step.id, i);
- if (engine._emitCheckpoint) engine._emitCheckpoint(ctx);
- }
- if (broke) break;
- }
- return; // while mode always serial, done
- }
- // ─── Source (array) mode ────────────────────────────
- let sourceExpr = step.source;
- if (typeof sourceExpr === 'string' && !sourceExpr.startsWith('=')) sourceExpr = '=' + sourceExpr;
- const rawSource = ev.evaluateValue(sourceExpr);
- const source = Array.isArray(rawSource) ? rawSource : [];
- if (!Array.isArray(rawSource)) {
- emitEvent(ctx, RunEventType.StepSkipped, step.id, {
- reason: `Loop source is ${rawSource === undefined ? 'undefined' : typeof rawSource} (expected array) — skipping loop`,
- expression: sourceExpr,
- });
- }
- // Apply maxIterations cap if present (v3.16)
- const maxIter = step.maxIterations ? Math.min(source.length, step.maxIterations) : source.length;
- const startIndex = ctx._loopProgress?.get(step.id) || 0;
- if (mode === 'serial') {
- for (let i = startIndex; i < maxIter; i++) {
- if (ctx.aborted || ctx.status === ExecutionStatus.Stopped) break;
- const broke = await executeLoopIteration(engine, ctx, step, children, source[i], i);
- if (ctx._loopProgress) {
- ctx._loopProgress.set(step.id, i + 1);
- if (engine._emitCheckpoint) engine._emitCheckpoint(ctx);
- }
- if (broke) break; // BREAK encountered
- }
- } else {
- // Parallel — BREAK support: flag to stop launching new iterations
- const { ChildExecutionContext } = require('./types');
- let breakTriggered = false;
- const branches = source.slice(startIndex, maxIter).map((item, offset) => {
- const index = startIndex + offset;
- return {
- id: `${step.id}[${index}]`,
- fn: async () => {
- if (breakTriggered) return; // Don't start new iterations after BREAK
- const childCtx = new ChildExecutionContext(ctx);
- childCtx.localVars._item = item;
- childCtx.localVars._index = index;
- childCtx.localVars._iterDir = `${step.id}_${index}`;
- try {
- for (const childID of children) {
- const childStep = engine.getStep(childID);
- if (childStep) await engine.executeStep(childCtx, childStep);
- }
- } catch (e) {
- if (e instanceof LoopBreakSignal) { breakTriggered = true; return; }
- throw e;
- }
- }
- };
- });
- await engine.parallelExecutor.execute(branches, engine.errorStrategy, ctx.signal);
- if (ctx._loopProgress) {
- ctx._loopProgress.set(step.id, maxIter);
- }
- }
- }
- /**
- * Execute one loop iteration. Returns true if BREAK was encountered.
- */
- async function executeLoopIteration(engine, ctx, step, children, item, index) {
- ctx.setVariable('_item', item);
- ctx.setVariable('_index', index);
- ctx.setVariable('_iterDir', `${step.id}_${index}`);
- let broke = false;
- try {
- for (const childID of children) {
- const childStep = engine.getStep(childID);
- if (childStep) await engine.executeStep(ctx, childStep);
- }
- } catch (e) {
- if (e instanceof LoopBreakSignal) { broke = true; }
- else throw e;
- } finally {
- ctx.setVariable('_item', undefined);
- ctx.setVariable('_index', undefined);
- ctx.setVariable('_iterDir', undefined);
- }
- return broke;
- }
- // ─── Download_* (v3.14+) ────────────────────────────────────
- async function executeDownloadStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- let url, headers = {}, timeout = 300000; // 5 min default
- if (typeof step.source === 'string') {
- url = ev.evaluateValue(step.source);
- } else if (step.source && typeof step.source === 'object') {
- url = ev.evaluateValue(step.source.url);
- if (step.source.headers) headers = ev.evaluateDeep(step.source.headers);
- if (step.source.timeout) timeout = step.source.timeout * 1000;
- }
- if (!url) throw new Error('Download source URL required');
- // Fetch
- const controller = new AbortController();
- const timer = setTimeout(() => controller.abort(), timeout);
- try {
- const resp = await fetch(url, { headers, signal: controller.signal });
- if (!resp.ok) throw new Error(`Download failed: ${resp.status} ${resp.statusText}`);
- const data = Buffer.from(await resp.arrayBuffer());
- // Determine target
- let targetPath = step.target ? ev.evaluateValue(step.target) : null;
- if (!targetPath && (step.routeByExt || step.defaultDir)) {
- const filename = extractFilename(url);
- targetPath = routePathByExt(filename, step.routeByExt, step.defaultDir);
- }
- if (!targetPath) {
- ctx.setVariable('_result', { size: data.length, url });
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- return;
- }
- targetPath = interpolateFilePath(ev, targetPath);
- targetPath = resolveTmpPath(ctx, targetPath);
- await ctx.fileAdapter.write(targetPath, data, WriteMode.Overwrite);
- ctx.artifacts[targetPath] = targetPath;
- emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: data.length });
- ctx.setVariable('_result', { path: targetPath });
- } finally {
- clearTimeout(timer);
- }
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- }
- // ─── Unzip_* (v3.14+) ───────────────────────────────────────
- async function executeUnzipStep(engine, ctx, step) {
- const ev = new ExpressionEvaluator(ctx);
- let sourcePath = typeof step.source === 'string' ? ev.evaluateValue(step.source) : String(step.source);
- sourcePath = interpolateFilePath(ev, sourcePath);
- sourcePath = resolveTmpPath(ctx, sourcePath);
- const zipData = await ctx.fileAdapter.read(sourcePath);
- // Use JSZip or similar — for now, require the adapter to handle it
- // We'll call fileAdapter.unzip if available, or throw
- if (!ctx.fileAdapter.unzip) throw new Error('File adapter does not support unzip');
- const overwrite = step.overwrite !== false;
- const mode = overwrite ? WriteMode.Overwrite : WriteMode.FailIfExists;
- const entries = await ctx.fileAdapter.unzip(zipData);
- let count = 0;
- const files = [];
- for (const entry of entries) {
- if (entry.isDirectory) continue;
- // Zip-slip safety
- if (entry.name.startsWith('/') || entry.name.includes('..')) continue;
- let targetPath = routePathByExt(entry.name, step.routeByExt, step.defaultDir);
- if (!targetPath) continue;
- targetPath = resolveTmpPath(ctx, targetPath);
- await ctx.fileAdapter.write(targetPath, entry.data, mode);
- ctx.artifacts[targetPath] = targetPath;
- emitEvent(ctx, RunEventType.FileDone, step.id, { path: targetPath, size_bytes: entry.data.length });
- files.push(targetPath);
- count++;
- }
- ctx.setVariable('_result', { count, files });
- await applyOutputMapping(engine, ctx, step);
- ctx.setVariable('_result', undefined);
- }
- // ─── Pause_* (v3.15+) ───────────────────────────────────────
- async function executePauseStep(engine, ctx, step) {
- const token = crypto.createHash('sha256')
- .update(`wt_${ctx.workflowID}_${step.id}_${Date.now()}`)
- .digest('hex');
- const pauseState = {
- token,
- nodeID: step.id,
- seenRequestIDs: new Set(),
- resumed: false,
- _resolve: null,
- _promise: null
- };
- // Create a promise that resume() will resolve
- pauseState._promise = new Promise(resolve => { pauseState._resolve = resolve; });
- ctx.pauseState = pauseState;
- ctx.status = ExecutionStatus.Paused;
- const timeoutSec = step.timeout ? (step.timeout.sec || step.timeout.seconds || 0) : 0;
- const expireAt = timeoutSec > 0 ? new Date(Date.now() + timeoutSec * 1000).toISOString() : null;
- emitEvent(ctx, RunEventType.PauseStart, step.id, {
- nodeId: step.id,
- waitToken: token,
- resumeResultTarget: step.resumeResultTarget || null,
- reason: step.reason || null,
- expireAt
- });
- // Wait for resume or timeout
- let result;
- if (timeoutSec > 0) {
- const timeoutMs = timeoutSec * 1000;
- result = await Promise.race([
- pauseState._promise,
- new Promise(resolve => setTimeout(() => resolve({ timeout: true }), timeoutMs))
- ]);
- } else {
- result = await pauseState._promise;
- }
- if (result?.timeout) {
- // Timeout
- emitEvent(ctx, RunEventType.PauseTimeout, step.id, {
- nodeId: step.id, expiredAt: new Date().toISOString(),
- timeoutAction: step.timeout.on || null
- });
- ctx.status = ExecutionStatus.Running;
- ctx.pauseState = null;
- if (step.timeout.on) {
- const timeoutStep = engine.getStep(step.timeout.on);
- if (timeoutStep) await engine.executeStep(ctx, timeoutStep);
- }
- return { handled: true }; // Pause handled its own next routing
- }
- // Resumed
- if (step.resumeResultTarget && result?.payload !== undefined) {
- const ev = new ExpressionEvaluator(ctx);
- ev.setVariable(step.resumeResultTarget, result.payload);
- }
- emitEvent(ctx, RunEventType.PauseResumed, step.id, {
- nodeId: step.id, requestId: result?.requestId || null,
- resumedAt: new Date().toISOString()
- });
- ctx.status = ExecutionStatus.Running;
- ctx.pauseState = null;
- // Pause_* handles its own step_done and next
- emitEvent(ctx, RunEventType.StepDone, step.id, {
- outputs: collectPauseOutputs(ctx, step)
- });
- if (step.next === 'BREAK') throw new LoopBreakSignal();
- if (step.next && step.next !== 'RETURN' && step.next !== 'STOP') {
- const nextStep = engine.getStep(step.next);
- if (nextStep) await engine.executeStep(ctx, nextStep);
- }
- return { handled: true };
- }
- // ─── Output Mapping ──────────────────────────────────────────
- async function applyOutputMapping(engine, ctx, step) {
- if (!step.out) return;
- const ev = new ExpressionEvaluator(ctx);
- // Shorthand: out is a string like "$plan" → means { "$plan": "=_result" }
- if (typeof step.out === 'string') {
- ev.setVariable(step.out, ctx.getVariable('_result'));
- return;
- }
- for (const [target, valueExpr] of Object.entries(step.out)) {
- const value = ev.evaluateValue(valueExpr);
- if (target.startsWith('/')) {
- // File write
- const filePath = interpolateFilePath(ev, target);
- // Guard: reject paths that resolved to empty or bare "/" (missing filePath interpolation)
- if (!filePath || filePath === '/') {
- emitEvent(ctx, RunEventType.StepError, step.id, {
- error: `File write skipped: path template "${target}" resolved to empty path — missing filePath in metadata`
- });
- continue;
- }
- const content = toBytes(value);
- await ctx.fileAdapter.write(filePath, content, WriteMode.Overwrite);
- ctx.artifacts[filePath] = filePath;
- emitEvent(ctx, RunEventType.FileDone, step.id, { path: filePath, size_bytes: content.length });
- } else if (target.startsWith('$') || target.startsWith('_')) {
- ev.setVariable(target, value);
- } else {
- throw new Error(`Invalid output target: ${target}`);
- }
- }
- }
- // ─── Helpers ─────────────────────────────────────────────────
- function interpolateFilePath(ev, path) {
- let result = '';
- let i = 0;
- while (i < path.length) {
- if (path[i] === '{') {
- const end = path.indexOf('}', i);
- if (end < 0) { result += path.slice(i); break; }
- const expr = path.slice(i + 1, end);
- const val = ev.evaluate(expr);
- result += val != null ? String(val) : '';
- i = end + 1;
- } else {
- result += path[i];
- i++;
- }
- }
- return result;
- }
- function resolveTmpPath(ctx, path) {
- if (!path.startsWith('.tmp/')) return path;
- return `.tmp/${ctx.workflowID}/${path.slice(5)}`;
- }
- function toBytes(value) {
- if (typeof value === 'string') return Buffer.from(value, 'utf8');
- if (Buffer.isBuffer(value)) return value;
- return Buffer.from(JSON.stringify(value), 'utf8');
- }
- function extractFilename(url) {
- try {
- const u = new URL(url);
- const parts = u.pathname.split('/');
- return parts[parts.length - 1] || 'download';
- } catch { return 'download'; }
- }
- function routePathByExt(filename, routeByExt, defaultDir) {
- if (!routeByExt && !defaultDir) return filename;
- const ext = filename.includes('.') ? filename.slice(filename.lastIndexOf('.')) : '';
- if (routeByExt && ext && routeByExt[ext]) {
- return routeByExt[ext] + '/' + filename;
- }
- if (defaultDir) return defaultDir + '/' + filename;
- return null; // Skip
- }
- function collectPauseOutputs(ctx, step) {
- if (!step.resumeResultTarget) return null;
- const val = ctx.getVariable(step.resumeResultTarget);
- return { [step.resumeResultTarget]: val };
- }
- function emitEvent(ctx, type, stepId, payload) {
- ctx.emitEvent({ type, stepID: stepId || null, payload: payload || {} });
- }
- module.exports = {
- executeServiceStep, executeComponentStep, executeLLMStep,
- executeAPIStep, executeSetStep, executeWriteStep,
- executeBranchStep, executeLoopStep,
- executeDownloadStep, executeUnzipStep, executePauseStep,
- applyOutputMapping, emitEvent, interpolateFilePath, resolveTmpPath,
- LoopBreakSignal
- };
|