/** * VL Workflow Engine — Main Engine * Ported from Go: workflow/engine.go * Spec: v3.16 * * Usage: * const engine = new Engine(workflow); * const result = await engine.execute(initialVars, adapters); * result.context.onEvent(event => console.log(event)); */ const { Registry, parseParamDeclaration, parseVariableDeclaration, validateRegistry } = require('./registry'); const { ParallelExecutor } = require('./parallel'); const { getStepType, resolveStepBehavior, ExecutionStatus, RunEventType, ParallelErrorStrategy, SUPPORTED_VERSIONS, isV310OrLater, RESERVED_NEXT_KEYWORDS, buildErrorMap, ExecutionContext, ChildExecutionContext, structuredCloneJSON } = require('./types'); const { ExpressionEvaluator, toBool } = require('./expression'); const { executeServiceStep, executeComponentStep, executeLLMStep, executeAPIStep, executeSetStep, executeWriteStep, executeBranchStep, executeLoopStep, executeDownloadStep, executeUnzipStep, executePauseStep, emitEvent, LoopBreakSignal } = require('./executor'); class Engine { /** * @param {object} workflow — parsed workflow JSON * @param {object} [options] * @param {number} [options.maxConcurrency] * @param {string} [options.errorStrategy] */ constructor(workflow, options = {}) { this.workflow = workflow; this.registry = new Registry(workflow.registry || {}); this.stepMap = new Map(); for (const step of (workflow.steps || [])) { this.stepMap.set(step.id, step); } this.errorStrategy = options.errorStrategy || ParallelErrorStrategy.FailFast; this.parallelExecutor = new ParallelExecutor(options.maxConcurrency || 0); // Custom step handlers: { Prefix: async (engine, ctx, step) => {...} } this.customHandlers = options.customHandlers || {}; // Event listener attached to every ExecutionContext created by execute() this._onEvent = options.onEvent || null; // Checkpoint callback: called after each step completes with ctx.checkpoint() this._onCheckpoint = options.onCheckpoint || null; } /** Check if a step prefix has a custom handler. */ _isCustomStep(stepID) { const prefix = stepID.split('_')[0]; return !!this.customHandlers[prefix]; } getStep(id) { return this.stepMap.get(id) || null; } // ─── Validate ────────────────────────────────────────────── validate() { const errors = []; const wf = this.workflow; if (!SUPPORTED_VERSIONS.has(wf.version)) errors.push(`Unsupported version: ${wf.version}`); if (!wf.name) errors.push('Workflow name required'); if (!wf.steps || wf.steps.length === 0) errors.push('At least one step required'); // Registry validation errors.push(...validateRegistry(wf.registry)); // Step validation const ids = new Set(); for (const step of (wf.steps || [])) { if (ids.has(step.id)) errors.push(`Duplicate step ID: ${step.id}`); ids.add(step.id); const type = getStepType(step.id); if (!type && !this._isCustomStep(step.id)) errors.push(`Unknown step type: ${step.id}`); if (type === 'Stop' && (step.next || (step.children && step.children.length))) { errors.push(`Stop_* cannot have next or children: ${step.id}`); } } // Reference integrity for (const step of (wf.steps || [])) { if (step.next && !RESERVED_NEXT_KEYWORDS.has(step.next) && !ids.has(step.next)) { errors.push(`Step ${step.id}: next references unknown step ${step.next}`); } for (const childId of (step.children || [])) { if (!ids.has(childId)) errors.push(`Step ${step.id}: child references unknown step ${childId}`); } for (const cas of (step.cases || [])) { if (Array.isArray(cas) && cas.length >= 2 && !ids.has(cas[1])) { errors.push(`Step ${step.id}: case references unknown step ${cas[1]}`); } if (cas && typeof cas === 'object' && !Array.isArray(cas)) { const ref = cas.next || cas.step; if (ref && !ids.has(ref)) errors.push(`Step ${step.id}: case references unknown step ${ref}`); } } if (step.default && !ids.has(step.default)) { errors.push(`Step ${step.id}: default references unknown step ${step.default}`); } // Check_* style: if_true / if_false references if (step.if_true && !ids.has(step.if_true)) { errors.push(`Step ${step.id}: if_true references unknown step ${step.if_true}`); } if (step.if_false && !ids.has(step.if_false)) { errors.push(`Step ${step.id}: if_false references unknown step ${step.if_false}`); } // onError can be a step ID or a builtin keyword (skip, retry, ignore) const ONERROR_BUILTINS = new Set(['skip', 'retry', 'ignore', 'continue']); if (step.onError && !ids.has(step.onError) && !ONERROR_BUILTINS.has(step.onError)) { errors.push(`Step ${step.id}: onError references unknown step ${step.onError}`); } if (step.timeout?.on && !ids.has(step.timeout.on)) { errors.push(`Step ${step.id}: timeout.on references unknown step ${step.timeout.on}`); } // v3.16 — Loop: while and source are mutually exclusive const sType = getStepType(step.id); if (sType === 'Loop') { if (step.while != null && step.source != null) { errors.push(`Step ${step.id}: Loop cannot have both "while" and "source" (mutually exclusive)`); } if (step.maxIterations != null && (typeof step.maxIterations !== 'number' || step.maxIterations < 1)) { errors.push(`Step ${step.id}: maxIterations must be a positive integer`); } } // v3.16 — BREAK only valid inside Loop children if (step.next === 'BREAK') { const isLoopChild = (wf.steps || []).some( s => getStepType(s.id) === 'Loop' && (s.children || []).includes(step.id) ); if (!isLoopChild) { errors.push(`Step ${step.id}: BREAK is only valid inside Loop children`); } } // v3.16 — LLM model format: / or or omitted if (sType === 'LLM' && step.model) { const parts = step.model.split('/'); if (parts.length > 2) { errors.push(`Step ${step.id}: model format must be "/" or ""`); } } } return errors; } // ─── Execute ─────────────────────────────────────────────── /** * Execute the workflow. * @param {object} initialVars — initial param/variable values * @param {object} adapters — { service, api, component, llm, file, doc } * @param {object} [runParams] — optional run-level params * @returns {ExecutionContext} */ async execute(initialVars = {}, adapters = {}, runParams = null) { const workflowID = `wf_${Date.now()}`; const paramDecls = this.registry.getParamDeclarations(); const varDecls = this.registry.getVariableDeclarations(); // Initialize params (read-only) const params = {}; const paramTypes = {}; for (const [name, decl] of Object.entries(paramDecls)) { paramTypes[name] = decl.type; if (initialVars[name] !== undefined) { params[name] = applyParamType(decl.type, initialVars[name]); } else if (decl.default !== undefined) { params[name] = decl.default; } } // Store extra initialVars not declared in registry as params (pass-through) for (const [name, value] of Object.entries(initialVars)) { if (!(name in paramDecls) && !name.startsWith('$')) { params[name] = value; } } // RunParams override if (runParams?.params) { for (const [k, v] of Object.entries(runParams.params)) { if (k in paramDecls) params[k] = applyParamType(paramDecls[k].type, v); else params[k] = v; } } // Initialize variables const variables = {}; const varTypes = {}; for (const [name, decl] of Object.entries(varDecls)) { varTypes[name] = decl.type; variables[name] = initialVars[name] !== undefined ? applyVarType(decl.type, initialVars[name]) : null; } const ctx = new ExecutionContext({ workflowID, params, paramTypes, variables, varTypes, adapters, runParams, version: this.workflow.version || '3.15' }); // Expose active context on engine instance (for external resume/pause access) this.activeCtx = ctx; // Attach event listener if provided if (this._onEvent) ctx.onEvent(this._onEvent); // Execute try { emitEvent(ctx, RunEventType.WorkflowStart, null, { name: this.workflow.name, version: this.workflow.version, params }); const entryIDs = this._findEntryNodeIDs(); if (entryIDs.length === 0) throw new Error('No entry nodes found'); if (entryIDs.length === 1) { const entryStep = this.getStep(entryIDs[0]); await this.executeStep(ctx, entryStep); } else { await this._executeChildren(ctx, entryIDs, '_entry'); } if (ctx.status === ExecutionStatus.Running) { ctx.status = ExecutionStatus.Completed; emitEvent(ctx, RunEventType.WorkflowDone, null, { duration_ms: Date.now() - ctx.startTime }); } } catch (err) { ctx.status = ExecutionStatus.Failed; emitEvent(ctx, RunEventType.WorkflowFailed, null, { error: err.message, failed_step_id: ctx.currentStepID, duration_ms: Date.now() - ctx.startTime }); throw err; } return ctx; } /** * Resume a paused workflow. */ resume(ctx, request) { if (!ctx.pauseState) throw new Error('Workflow is not paused'); if (ctx.pauseState.token !== request.token) { emitEvent(ctx, RunEventType.PauseRejected, ctx.pauseState.nodeID, { reason: 'invalid_token' }); throw new Error('Invalid pause token'); } // Idempotency if (request.requestId && ctx.pauseState.seenRequestIDs.has(request.requestId)) return; if (request.requestId) ctx.pauseState.seenRequestIDs.add(request.requestId); ctx.pauseState._resolve({ payload: request.payload, requestId: request.requestId }); } // ─── Execute From Checkpoint / Step ID ───────────────────── /** * Resume execution from a checkpoint or a specific step. * * Usage 1 — from checkpoint (crash recovery): * const cp = JSON.parse(fs.readFileSync('checkpoint.json')); * const ctx = await engine.executeFrom(cp, adapters); * * Usage 2 — from step ID with variable overrides (re-run): * const ctx = await engine.executeFrom({ * currentStepID: 'LLM_GenerateCode', * variables: { '$plan': modifiedPlan }, * params: originalParams * }, adapters); * * @param {object} checkpoint — checkpoint object or { currentStepID, params?, variables? } * @param {object} adapters — { service, api, component, llm, file, doc } * @param {object} [overrides] — optional variable overrides applied after checkpoint restore * @returns {ExecutionContext} */ async executeFrom(checkpoint, adapters = {}, overrides = null) { if (!checkpoint?.currentStepID) throw new Error('checkpoint.currentStepID is required'); const startStepID = checkpoint.currentStepID; const startStep = this.getStep(startStepID); if (!startStep) throw new Error(`Step not found: ${startStepID}`); // Rebuild context from checkpoint data const workflowID = checkpoint.workflowID || `wf_resume_${Date.now()}`; const ctx = new ExecutionContext({ workflowID, params: checkpoint.params ? structuredCloneJSON(checkpoint.params) : {}, paramTypes: checkpoint.paramTypes || {}, variables: checkpoint.variables ? structuredCloneJSON(checkpoint.variables) : {}, varTypes: checkpoint.varTypes || {}, localVars: checkpoint.localVars ? structuredCloneJSON(checkpoint.localVars) : {}, artifacts: checkpoint.artifacts || {}, adapters, runParams: checkpoint.runParams || null, version: checkpoint.version || this.workflow.version || '3.15' }); // Expose active context on engine instance (for external resume/pause access) this.activeCtx = ctx; // Restore completed steps list if (checkpoint.completedSteps) { ctx._completedSteps = [...checkpoint.completedSteps]; } // Restore parallel branch completion state if (checkpoint.completedBranches) { for (const [parent, children] of Object.entries(checkpoint.completedBranches)) { ctx._completedBranches.set(parent, new Set(children)); } } // Restore loop progress if (checkpoint.loopProgress) { for (const [loopID, count] of Object.entries(checkpoint.loopProgress)) { ctx._loopProgress.set(loopID, count); } } // Restore event sequence counter for continuity if (checkpoint.eventSeq) { ctx._eventSeq = checkpoint.eventSeq; } // Apply overrides (e.g., user tweaked a variable before re-running) if (overrides) { for (const [k, v] of Object.entries(overrides)) { if (k.startsWith('$') || k.startsWith('_')) { ctx.setVariable(k, v); } else { ctx.params[k] = v; } } } // Attach event listener if (this._onEvent) ctx.onEvent(this._onEvent); // Execute from the specified step try { emitEvent(ctx, RunEventType.WorkflowStart, null, { name: this.workflow.name, version: this.workflow.version, params: ctx.params, resumedFrom: startStepID }); await this.executeStep(ctx, startStep); if (ctx.status === ExecutionStatus.Running) { ctx.status = ExecutionStatus.Completed; emitEvent(ctx, RunEventType.WorkflowDone, null, { duration_ms: Date.now() - ctx.startTime }); } } catch (err) { ctx.status = ExecutionStatus.Failed; emitEvent(ctx, RunEventType.WorkflowFailed, null, { error: err.message, failed_step_id: ctx.currentStepID, duration_ms: Date.now() - ctx.startTime }); throw err; } return ctx; } // ─── Step Execution ──────────────────────────────────────── async executeStep(ctx, step) { // Short-circuit if stopped/cancelled/paused if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return; if (ctx.status === ExecutionStatus.Paused) { this._emitCheckpoint(ctx); // save checkpoint so user can resume return; } ctx.currentStepID = step.id; const rawType = getStepType(step.id); const stepType = resolveStepBehavior(rawType) || rawType; // Evaluate condition if (step.if) { const ev = new ExpressionEvaluator(ctx); const cond = ev.evaluateValue(step.if); if (!toBool(cond)) { emitEvent(ctx, RunEventType.StepSkipped, step.id, { condition: step.if }); return this._moveToNext(ctx, step); } } // Resolve inputs for event payload (lightweight re-evaluation, approach B) let resolvedInputs = null; if (step.in && typeof step.in === 'object') { try { const ev2 = new ExpressionEvaluator(ctx); resolvedInputs = summarizeDeep(ev2.evaluateDeep(step.in)); } catch { /* best-effort — don't break execution */ } } // Emit step_start with resolved inputs emitEvent(ctx, RunEventType.StepStart, step.id, { type: rawType, meta: step.meta, resolvedInputs // may be null if step has no inputs }); // Collect file targets for file_start events const fileTargets = this._collectFileTargets(step); for (const ft of fileTargets) { emitEvent(ctx, RunEventType.FileStart, step.id, { path: ft }); } // Determine max attempts for onError: 'retry' (v3.10+) const isRetry = step.onError === 'retry' && isV310OrLater(this.workflow.version); const maxAttempts = isRetry ? (step.retryCount ?? 1) + 1 : 1; let err = null; const prefix = step.id.split('_')[0]; const customHandler = this.customHandlers[prefix]; for (let attempt = 0; attempt < maxAttempts; attempt++) { err = null; try { if (customHandler) { // Delegate to custom handler — same lifecycle as built-in steps await customHandler(this, ctx, step); } else { switch (stepType) { case 'Service': await executeServiceStep(this, ctx, step); break; case 'Component': await executeComponentStep(this, ctx, step); break; case 'LLM': await executeLLMStep(this, ctx, step); break; case 'API': await executeAPIStep(this, ctx, step); break; case 'Set': executeSetStep(this, ctx, step); break; case 'Write': await executeWriteStep(this, ctx, step); break; case 'Download': await executeDownloadStep(this, ctx, step); break; case 'Unzip': await executeUnzipStep(this, ctx, step); break; case 'Pause': { const result = await executePauseStep(this, ctx, step); if (result?.handled) return; // Pause handles its own next break; } case 'Branch': { const selectedID = executeBranchStep(this, ctx, step); emitEvent(ctx, RunEventType.StepDone, step.id, { selected: selectedID, outputs: collectStepOutputs(ctx, step) }); if (ctx._completedSteps) ctx._completedSteps.push(step.id); this._emitCheckpoint(ctx); if (selectedID) { const branchStep = this.getStep(selectedID); if (branchStep) await this.executeStep(ctx, branchStep); } return this._moveToNext(ctx, step); } case 'Loop': await executeLoopStep(this, ctx, step); emitEvent(ctx, RunEventType.StepDone, step.id, { outputs: collectStepOutputs(ctx, step) }); if (ctx._completedSteps) ctx._completedSteps.push(step.id); this._emitCheckpoint(ctx); return this._moveToNext(ctx, step); case 'Stop': ctx.status = ExecutionStatus.Stopped; emitEvent(ctx, RunEventType.WorkflowDone, null, { stop_id: step.id, duration_ms: Date.now() - ctx.startTime }); return; case 'Noop': break; default: throw new Error(`Unknown step type: ${stepType} (${step.id})`); } } } catch (e) { err = e; if (attempt + 1 < maxAttempts) { emitEvent(ctx, RunEventType.StepError, step.id, { error: e.message, attempt: attempt + 1, maxAttempts }); if (step.retryDelay) await new Promise(r => setTimeout(r, step.retryDelay)); continue; } } if (!err) break; // success — exit retry loop } if (err) { emitEvent(ctx, RunEventType.StepError, step.id, { error: err.message }); // OnError handler (v3.10+) if (step.onError && isV310OrLater(this.workflow.version)) { ctx.setVariable('_error', buildErrorMap(err)); // Handle built-in keywords explicitly if (step.onError === 'retry' || step.onError === 'skip' || step.onError === 'ignore' || step.onError === 'continue') { ctx.setVariable('_error', undefined); return this._moveToNext(ctx, step); } // Fall through to step-ID handler const errorStep = this.getStep(step.onError); try { if (errorStep) await this.executeStep(ctx, errorStep); } finally { ctx.setVariable('_error', undefined); ctx.setVariable('_meta', undefined); } return this._moveToNext(ctx, step); } throw err; } // step_print (v3.13+) if (step.print) { const ev = new ExpressionEvaluator(ctx); const printVal = ev.evaluateValue(step.print); emitEvent(ctx, RunEventType.StepPrint, step.id, { value: printVal }); } emitEvent(ctx, RunEventType.StepDone, step.id, { outputs: collectStepOutputs(ctx, step) }); if (ctx._completedSteps) ctx._completedSteps.push(step.id); this._emitCheckpoint(ctx); // Execute children (parallel branches) if (step.children && step.children.length > 0 && stepType !== 'Loop' && stepType !== 'Branch') { await this._executeChildren(ctx, step.children, step.id); } // Move to next return this._moveToNext(ctx, step); } /** Emit checkpoint after step completion (if callback registered). */ _emitCheckpoint(ctx) { if (this._onCheckpoint) { try { this._onCheckpoint(ctx.checkpoint()); } catch {} } } // ─── Internal ────────────────────────────────────────────── async _moveToNext(ctx, step) { if (!step.next || step.next === 'RETURN' || step.next === 'STOP') return; if (step.next === 'BREAK') throw new LoopBreakSignal(); if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return; const nextStep = this.getStep(step.next); if (nextStep) await this.executeStep(ctx, nextStep); } async _executeChildren(ctx, childIDs, parentStepID = null) { if (!childIDs || childIDs.length === 0) return; // Phase 2: filter out already-completed branches (for resume) const completedSet = parentStepID ? ctx._completedBranches.get(parentStepID) : null; const pendingIDs = completedSet ? childIDs.filter(id => !completedSet.has(id)) : childIDs; if (pendingIDs.length === 0) return; if (pendingIDs.length === 1) { const child = this.getStep(pendingIDs[0]); if (child) { await this.executeStep(ctx, child); // Track completion if (parentStepID) this._markBranchCompleted(ctx, parentStepID, pendingIDs[0]); } return; } // Parallel execution const engine = this; const branches = pendingIDs.map(id => ({ id, fn: async (failFastSignal) => { const childCtx = new ChildExecutionContext(ctx); if (failFastSignal) childCtx._failFastSignal = failFastSignal; // Copy parent local vars to child for (const key of ['_item', '_index', '_result', '_meta', '_error']) { if (ctx.localVars?.[key] !== undefined) childCtx.localVars[key] = ctx.localVars[key]; else if (ctx.getVariable?.(key) !== undefined) childCtx.localVars[key] = ctx.getVariable(key); } const step = engine.getStep(id); if (step) await engine.executeStep(childCtx, step); // Track branch completion if (parentStepID) engine._markBranchCompleted(ctx, parentStepID, id); } })); await this.parallelExecutor.execute(branches, this.errorStrategy, ctx.signal); } _markBranchCompleted(ctx, parentStepID, childID) { if (!ctx._completedBranches.has(parentStepID)) { ctx._completedBranches.set(parentStepID, new Set()); } ctx._completedBranches.get(parentStepID).add(childID); this._emitCheckpoint(ctx); } _findEntryNodeIDs() { const referenced = new Set(); for (const step of this.workflow.steps) { if (step.next && step.next !== 'RETURN' && step.next !== 'STOP' && step.next !== 'BREAK') referenced.add(step.next); for (const child of (step.children || [])) referenced.add(child); for (const cas of (step.cases || [])) { if (Array.isArray(cas) && cas.length >= 2) referenced.add(cas[1]); if (cas && typeof cas === 'object' && !Array.isArray(cas)) { const ref = cas.next || cas.step; if (ref) referenced.add(ref); } } if (step.default) referenced.add(step.default); if (step.if_true) referenced.add(step.if_true); if (step.if_false) referenced.add(step.if_false); if (step.onError) referenced.add(step.onError); if (step.timeout?.on) referenced.add(step.timeout.on); } return this.workflow.steps.map(s => s.id).filter(id => !referenced.has(id)); } _collectFileTargets(step) { const targets = []; if (step.target && typeof step.target === 'string' && getStepType(step.id) === 'Write') { targets.push(step.target); } if (step.out) { for (const target of Object.keys(step.out)) { if (target.startsWith('/')) targets.push(target); } } return targets; } } // ─── Event Payload Helpers ──────────────────────────────────── /** * Truncate large values to prevent oversized event payloads. * If a single value serializes to > maxBytes, replace with a summary. */ function summarizeIfLarge(value, maxBytes = 10240) { if (value === null || value === undefined) return value; if (typeof value !== 'object' && typeof value !== 'string') return value; let json; try { json = JSON.stringify(value); } catch { return { _truncated: true, type: typeof value, error: 'non-serializable' }; } if (json.length <= maxBytes) return value; return { _truncated: true, type: typeof value, length: json.length, preview: json.slice(0, 200) + '...' }; } /** Apply summarizeIfLarge to every value in a shallow object. */ function summarizeDeep(obj, maxBytes = 10240) { if (!obj || typeof obj !== 'object') return summarizeIfLarge(obj, maxBytes); const out = {}; for (const [k, v] of Object.entries(obj)) { out[k] = summarizeIfLarge(v, maxBytes); } return out; } /** * Collect outputs after applyOutputMapping by reading back the targets from ctx. * For $variable targets, read the value; for /file/ targets, return metadata only. */ function collectStepOutputs(ctx, step) { if (!step.out) return null; const outputs = {}; // Shorthand: out is a string like "$plan" if (typeof step.out === 'string') { outputs[step.out] = summarizeIfLarge(ctx.getVariable(step.out)); return outputs; } for (const target of Object.keys(step.out)) { if (target.startsWith('/')) { // File target — only metadata outputs[target] = { _file: true, path: target }; } else if (target.startsWith('$') || target.startsWith('_')) { outputs[target] = summarizeIfLarge(ctx.getVariable(target)); } } return outputs; } // ─── Helpers ───────────────────────────────────────────────── function applyParamType(type, value) { if (type === 'OBJECT' && typeof value === 'string') { try { return JSON.parse(value); } catch { return value; } } if (type === 'STRING' && typeof value !== 'string') { return typeof value === 'object' ? JSON.stringify(value) : String(value); } return value; } function applyVarType(type, value) { if (type === 'OBJECT' && typeof value === 'string') { try { return JSON.parse(value); } catch { return value; } } if (type?.startsWith('[') && typeof value === 'string') { try { return JSON.parse(value); } catch { return value; } } return value; } module.exports = { Engine };