| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709 |
- /**
- * VL Workflow Engine — Main Engine
- * Ported from Go: workflow/engine.go
- * Spec: v3.16
- *
- * Usage:
- * const engine = new Engine(workflow);
- * const result = await engine.execute(initialVars, adapters);
- * result.context.onEvent(event => console.log(event));
- */
- const { Registry, parseParamDeclaration, parseVariableDeclaration, validateRegistry } = require('./registry');
- const { ParallelExecutor } = require('./parallel');
- const {
- getStepType, resolveStepBehavior, ExecutionStatus, RunEventType, ParallelErrorStrategy,
- SUPPORTED_VERSIONS, isV310OrLater, RESERVED_NEXT_KEYWORDS, buildErrorMap,
- ExecutionContext, ChildExecutionContext, structuredCloneJSON
- } = require('./types');
- const { ExpressionEvaluator, toBool } = require('./expression');
- const {
- executeServiceStep, executeComponentStep, executeLLMStep,
- executeAPIStep, executeSetStep, executeWriteStep,
- executeBranchStep, executeLoopStep,
- executeDownloadStep, executeUnzipStep, executePauseStep,
- emitEvent, LoopBreakSignal
- } = require('./executor');
- class Engine {
- /**
- * @param {object} workflow — parsed workflow JSON
- * @param {object} [options]
- * @param {number} [options.maxConcurrency]
- * @param {string} [options.errorStrategy]
- */
- constructor(workflow, options = {}) {
- this.workflow = workflow;
- this.registry = new Registry(workflow.registry || {});
- this.stepMap = new Map();
- for (const step of (workflow.steps || [])) {
- this.stepMap.set(step.id, step);
- }
- this.errorStrategy = options.errorStrategy || ParallelErrorStrategy.FailFast;
- this.parallelExecutor = new ParallelExecutor(options.maxConcurrency || 0);
- // Custom step handlers: { Prefix: async (engine, ctx, step) => {...} }
- this.customHandlers = options.customHandlers || {};
- // Event listener attached to every ExecutionContext created by execute()
- this._onEvent = options.onEvent || null;
- // Checkpoint callback: called after each step completes with ctx.checkpoint()
- this._onCheckpoint = options.onCheckpoint || null;
- }
- /** Check if a step prefix has a custom handler. */
- _isCustomStep(stepID) {
- const prefix = stepID.split('_')[0];
- return !!this.customHandlers[prefix];
- }
- getStep(id) { return this.stepMap.get(id) || null; }
- // ─── Validate ──────────────────────────────────────────────
- validate() {
- const errors = [];
- const wf = this.workflow;
- if (!SUPPORTED_VERSIONS.has(wf.version)) errors.push(`Unsupported version: ${wf.version}`);
- if (!wf.name) errors.push('Workflow name required');
- if (!wf.steps || wf.steps.length === 0) errors.push('At least one step required');
- // Registry validation
- errors.push(...validateRegistry(wf.registry));
- // Step validation
- const ids = new Set();
- for (const step of (wf.steps || [])) {
- if (ids.has(step.id)) errors.push(`Duplicate step ID: ${step.id}`);
- ids.add(step.id);
- const type = getStepType(step.id);
- if (!type && !this._isCustomStep(step.id)) errors.push(`Unknown step type: ${step.id}`);
- if (type === 'Stop' && (step.next || (step.children && step.children.length))) {
- errors.push(`Stop_* cannot have next or children: ${step.id}`);
- }
- }
- // Reference integrity
- for (const step of (wf.steps || [])) {
- if (step.next && !RESERVED_NEXT_KEYWORDS.has(step.next) && !ids.has(step.next)) {
- errors.push(`Step ${step.id}: next references unknown step ${step.next}`);
- }
- for (const childId of (step.children || [])) {
- if (!ids.has(childId)) errors.push(`Step ${step.id}: child references unknown step ${childId}`);
- }
- for (const cas of (step.cases || [])) {
- if (Array.isArray(cas) && cas.length >= 2 && !ids.has(cas[1])) {
- errors.push(`Step ${step.id}: case references unknown step ${cas[1]}`);
- }
- if (cas && typeof cas === 'object' && !Array.isArray(cas)) {
- const ref = cas.next || cas.step;
- if (ref && !ids.has(ref)) errors.push(`Step ${step.id}: case references unknown step ${ref}`);
- }
- }
- if (step.default && !ids.has(step.default)) {
- errors.push(`Step ${step.id}: default references unknown step ${step.default}`);
- }
- // Check_* style: if_true / if_false references
- if (step.if_true && !ids.has(step.if_true)) {
- errors.push(`Step ${step.id}: if_true references unknown step ${step.if_true}`);
- }
- if (step.if_false && !ids.has(step.if_false)) {
- errors.push(`Step ${step.id}: if_false references unknown step ${step.if_false}`);
- }
- // onError can be a step ID or a builtin keyword (skip, retry, ignore)
- const ONERROR_BUILTINS = new Set(['skip', 'retry', 'ignore', 'continue']);
- if (step.onError && !ids.has(step.onError) && !ONERROR_BUILTINS.has(step.onError)) {
- errors.push(`Step ${step.id}: onError references unknown step ${step.onError}`);
- }
- if (step.timeout?.on && !ids.has(step.timeout.on)) {
- errors.push(`Step ${step.id}: timeout.on references unknown step ${step.timeout.on}`);
- }
- // v3.16 — Loop: while and source are mutually exclusive
- const sType = getStepType(step.id);
- if (sType === 'Loop') {
- if (step.while != null && step.source != null) {
- errors.push(`Step ${step.id}: Loop cannot have both "while" and "source" (mutually exclusive)`);
- }
- if (step.maxIterations != null && (typeof step.maxIterations !== 'number' || step.maxIterations < 1)) {
- errors.push(`Step ${step.id}: maxIterations must be a positive integer`);
- }
- }
- // v3.16 — BREAK only valid inside Loop children
- if (step.next === 'BREAK') {
- const isLoopChild = (wf.steps || []).some(
- s => getStepType(s.id) === 'Loop' && (s.children || []).includes(step.id)
- );
- if (!isLoopChild) {
- errors.push(`Step ${step.id}: BREAK is only valid inside Loop children`);
- }
- }
- // v3.16 — LLM model format: <provider>/<modelId> or <provider> or omitted
- if (sType === 'LLM' && step.model) {
- const parts = step.model.split('/');
- if (parts.length > 2) {
- errors.push(`Step ${step.id}: model format must be "<provider>/<modelId>" or "<provider>"`);
- }
- }
- }
- return errors;
- }
- // ─── Execute ───────────────────────────────────────────────
- /**
- * Execute the workflow.
- * @param {object} initialVars — initial param/variable values
- * @param {object} adapters — { service, api, component, llm, file, doc }
- * @param {object} [runParams] — optional run-level params
- * @returns {ExecutionContext}
- */
- async execute(initialVars = {}, adapters = {}, runParams = null) {
- const workflowID = `wf_${Date.now()}`;
- const paramDecls = this.registry.getParamDeclarations();
- const varDecls = this.registry.getVariableDeclarations();
- // Initialize params (read-only)
- const params = {};
- const paramTypes = {};
- for (const [name, decl] of Object.entries(paramDecls)) {
- paramTypes[name] = decl.type;
- if (initialVars[name] !== undefined) {
- params[name] = applyParamType(decl.type, initialVars[name]);
- } else if (decl.default !== undefined) {
- params[name] = decl.default;
- }
- }
- // Store extra initialVars not declared in registry as params (pass-through)
- for (const [name, value] of Object.entries(initialVars)) {
- if (!(name in paramDecls) && !name.startsWith('$')) {
- params[name] = value;
- }
- }
- // RunParams override
- if (runParams?.params) {
- for (const [k, v] of Object.entries(runParams.params)) {
- if (k in paramDecls) params[k] = applyParamType(paramDecls[k].type, v);
- else params[k] = v;
- }
- }
- // Initialize variables
- const variables = {};
- const varTypes = {};
- for (const [name, decl] of Object.entries(varDecls)) {
- varTypes[name] = decl.type;
- variables[name] = initialVars[name] !== undefined ? applyVarType(decl.type, initialVars[name]) : null;
- }
- const ctx = new ExecutionContext({
- workflowID, params, paramTypes, variables, varTypes,
- adapters, runParams, version: this.workflow.version || '3.15'
- });
- // Expose active context on engine instance (for external resume/pause access)
- this.activeCtx = ctx;
- // Attach event listener if provided
- if (this._onEvent) ctx.onEvent(this._onEvent);
- // Execute
- try {
- emitEvent(ctx, RunEventType.WorkflowStart, null, {
- name: this.workflow.name, version: this.workflow.version, params
- });
- const entryIDs = this._findEntryNodeIDs();
- if (entryIDs.length === 0) throw new Error('No entry nodes found');
- if (entryIDs.length === 1) {
- const entryStep = this.getStep(entryIDs[0]);
- await this.executeStep(ctx, entryStep);
- } else {
- await this._executeChildren(ctx, entryIDs, '_entry');
- }
- if (ctx.status === ExecutionStatus.Running) {
- ctx.status = ExecutionStatus.Completed;
- emitEvent(ctx, RunEventType.WorkflowDone, null, {
- duration_ms: Date.now() - ctx.startTime
- });
- }
- } catch (err) {
- ctx.status = ExecutionStatus.Failed;
- emitEvent(ctx, RunEventType.WorkflowFailed, null, {
- error: err.message, failed_step_id: ctx.currentStepID,
- duration_ms: Date.now() - ctx.startTime
- });
- throw err;
- }
- return ctx;
- }
- /**
- * Resume a paused workflow.
- */
- resume(ctx, request) {
- if (!ctx.pauseState) throw new Error('Workflow is not paused');
- if (ctx.pauseState.token !== request.token) {
- emitEvent(ctx, RunEventType.PauseRejected, ctx.pauseState.nodeID, {
- reason: 'invalid_token'
- });
- throw new Error('Invalid pause token');
- }
- // Idempotency
- if (request.requestId && ctx.pauseState.seenRequestIDs.has(request.requestId)) return;
- if (request.requestId) ctx.pauseState.seenRequestIDs.add(request.requestId);
- ctx.pauseState._resolve({ payload: request.payload, requestId: request.requestId });
- }
- // ─── Execute From Checkpoint / Step ID ─────────────────────
- /**
- * Resume execution from a checkpoint or a specific step.
- *
- * Usage 1 — from checkpoint (crash recovery):
- * const cp = JSON.parse(fs.readFileSync('checkpoint.json'));
- * const ctx = await engine.executeFrom(cp, adapters);
- *
- * Usage 2 — from step ID with variable overrides (re-run):
- * const ctx = await engine.executeFrom({
- * currentStepID: 'LLM_GenerateCode',
- * variables: { '$plan': modifiedPlan },
- * params: originalParams
- * }, adapters);
- *
- * @param {object} checkpoint — checkpoint object or { currentStepID, params?, variables? }
- * @param {object} adapters — { service, api, component, llm, file, doc }
- * @param {object} [overrides] — optional variable overrides applied after checkpoint restore
- * @returns {ExecutionContext}
- */
- async executeFrom(checkpoint, adapters = {}, overrides = null) {
- if (!checkpoint?.currentStepID) throw new Error('checkpoint.currentStepID is required');
- const startStepID = checkpoint.currentStepID;
- const startStep = this.getStep(startStepID);
- if (!startStep) throw new Error(`Step not found: ${startStepID}`);
- // Rebuild context from checkpoint data
- const workflowID = checkpoint.workflowID || `wf_resume_${Date.now()}`;
- const ctx = new ExecutionContext({
- workflowID,
- params: checkpoint.params ? structuredCloneJSON(checkpoint.params) : {},
- paramTypes: checkpoint.paramTypes || {},
- variables: checkpoint.variables ? structuredCloneJSON(checkpoint.variables) : {},
- varTypes: checkpoint.varTypes || {},
- localVars: checkpoint.localVars ? structuredCloneJSON(checkpoint.localVars) : {},
- artifacts: checkpoint.artifacts || {},
- adapters,
- runParams: checkpoint.runParams || null,
- version: checkpoint.version || this.workflow.version || '3.15'
- });
- // Expose active context on engine instance (for external resume/pause access)
- this.activeCtx = ctx;
- // Restore completed steps list
- if (checkpoint.completedSteps) {
- ctx._completedSteps = [...checkpoint.completedSteps];
- }
- // Restore parallel branch completion state
- if (checkpoint.completedBranches) {
- for (const [parent, children] of Object.entries(checkpoint.completedBranches)) {
- ctx._completedBranches.set(parent, new Set(children));
- }
- }
- // Restore loop progress
- if (checkpoint.loopProgress) {
- for (const [loopID, count] of Object.entries(checkpoint.loopProgress)) {
- ctx._loopProgress.set(loopID, count);
- }
- }
- // Restore event sequence counter for continuity
- if (checkpoint.eventSeq) {
- ctx._eventSeq = checkpoint.eventSeq;
- }
- // Apply overrides (e.g., user tweaked a variable before re-running)
- if (overrides) {
- for (const [k, v] of Object.entries(overrides)) {
- if (k.startsWith('$') || k.startsWith('_')) {
- ctx.setVariable(k, v);
- } else {
- ctx.params[k] = v;
- }
- }
- }
- // Attach event listener
- if (this._onEvent) ctx.onEvent(this._onEvent);
- // Execute from the specified step
- try {
- emitEvent(ctx, RunEventType.WorkflowStart, null, {
- name: this.workflow.name, version: this.workflow.version,
- params: ctx.params, resumedFrom: startStepID
- });
- await this.executeStep(ctx, startStep);
- if (ctx.status === ExecutionStatus.Running) {
- ctx.status = ExecutionStatus.Completed;
- emitEvent(ctx, RunEventType.WorkflowDone, null, {
- duration_ms: Date.now() - ctx.startTime
- });
- }
- } catch (err) {
- ctx.status = ExecutionStatus.Failed;
- emitEvent(ctx, RunEventType.WorkflowFailed, null, {
- error: err.message, failed_step_id: ctx.currentStepID,
- duration_ms: Date.now() - ctx.startTime
- });
- throw err;
- }
- return ctx;
- }
- // ─── Step Execution ────────────────────────────────────────
- async executeStep(ctx, step) {
- // Short-circuit if stopped/cancelled/paused
- if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return;
- if (ctx.status === ExecutionStatus.Paused) {
- this._emitCheckpoint(ctx); // save checkpoint so user can resume
- return;
- }
- ctx.currentStepID = step.id;
- const rawType = getStepType(step.id);
- const stepType = resolveStepBehavior(rawType) || rawType;
- // Evaluate condition
- if (step.if) {
- const ev = new ExpressionEvaluator(ctx);
- const cond = ev.evaluateValue(step.if);
- if (!toBool(cond)) {
- emitEvent(ctx, RunEventType.StepSkipped, step.id, { condition: step.if });
- return this._moveToNext(ctx, step);
- }
- }
- // Resolve inputs for event payload (lightweight re-evaluation, approach B)
- let resolvedInputs = null;
- if (step.in && typeof step.in === 'object') {
- try {
- const ev2 = new ExpressionEvaluator(ctx);
- resolvedInputs = summarizeDeep(ev2.evaluateDeep(step.in));
- } catch { /* best-effort — don't break execution */ }
- }
- // Emit step_start with resolved inputs
- emitEvent(ctx, RunEventType.StepStart, step.id, {
- type: rawType, meta: step.meta,
- resolvedInputs // may be null if step has no inputs
- });
- // Collect file targets for file_start events
- const fileTargets = this._collectFileTargets(step);
- for (const ft of fileTargets) {
- emitEvent(ctx, RunEventType.FileStart, step.id, { path: ft });
- }
- // Determine max attempts for onError: 'retry' (v3.10+)
- const isRetry = step.onError === 'retry' && isV310OrLater(this.workflow.version);
- const maxAttempts = isRetry ? (step.retryCount ?? 1) + 1 : 1;
- let err = null;
- const prefix = step.id.split('_')[0];
- const customHandler = this.customHandlers[prefix];
- for (let attempt = 0; attempt < maxAttempts; attempt++) {
- err = null;
- try {
- if (customHandler) {
- // Delegate to custom handler — same lifecycle as built-in steps
- await customHandler(this, ctx, step);
- } else {
- switch (stepType) {
- case 'Service': await executeServiceStep(this, ctx, step); break;
- case 'Component': await executeComponentStep(this, ctx, step); break;
- case 'LLM': await executeLLMStep(this, ctx, step); break;
- case 'API': await executeAPIStep(this, ctx, step); break;
- case 'Set': executeSetStep(this, ctx, step); break;
- case 'Write': await executeWriteStep(this, ctx, step); break;
- case 'Download': await executeDownloadStep(this, ctx, step); break;
- case 'Unzip': await executeUnzipStep(this, ctx, step); break;
- case 'Pause': {
- const result = await executePauseStep(this, ctx, step);
- if (result?.handled) return; // Pause handles its own next
- break;
- }
- case 'Branch': {
- const selectedID = executeBranchStep(this, ctx, step);
- emitEvent(ctx, RunEventType.StepDone, step.id, {
- selected: selectedID, outputs: collectStepOutputs(ctx, step)
- });
- if (ctx._completedSteps) ctx._completedSteps.push(step.id);
- this._emitCheckpoint(ctx);
- if (selectedID) {
- const branchStep = this.getStep(selectedID);
- if (branchStep) await this.executeStep(ctx, branchStep);
- }
- return this._moveToNext(ctx, step);
- }
- case 'Loop':
- await executeLoopStep(this, ctx, step);
- emitEvent(ctx, RunEventType.StepDone, step.id, {
- outputs: collectStepOutputs(ctx, step)
- });
- if (ctx._completedSteps) ctx._completedSteps.push(step.id);
- this._emitCheckpoint(ctx);
- return this._moveToNext(ctx, step);
- case 'Stop':
- ctx.status = ExecutionStatus.Stopped;
- emitEvent(ctx, RunEventType.WorkflowDone, null, {
- stop_id: step.id, duration_ms: Date.now() - ctx.startTime
- });
- return;
- case 'Noop':
- break;
- default:
- throw new Error(`Unknown step type: ${stepType} (${step.id})`);
- }
- }
- } catch (e) {
- err = e;
- if (attempt + 1 < maxAttempts) {
- emitEvent(ctx, RunEventType.StepError, step.id, {
- error: e.message, attempt: attempt + 1, maxAttempts
- });
- if (step.retryDelay) await new Promise(r => setTimeout(r, step.retryDelay));
- continue;
- }
- }
- if (!err) break; // success — exit retry loop
- }
- if (err) {
- emitEvent(ctx, RunEventType.StepError, step.id, { error: err.message });
- // OnError handler (v3.10+)
- if (step.onError && isV310OrLater(this.workflow.version)) {
- ctx.setVariable('_error', buildErrorMap(err));
- // Handle built-in keywords explicitly
- if (step.onError === 'retry' || step.onError === 'skip' ||
- step.onError === 'ignore' || step.onError === 'continue') {
- ctx.setVariable('_error', undefined);
- return this._moveToNext(ctx, step);
- }
- // Fall through to step-ID handler
- const errorStep = this.getStep(step.onError);
- try {
- if (errorStep) await this.executeStep(ctx, errorStep);
- } finally {
- ctx.setVariable('_error', undefined);
- ctx.setVariable('_meta', undefined);
- }
- return this._moveToNext(ctx, step);
- }
- throw err;
- }
- // step_print (v3.13+)
- if (step.print) {
- const ev = new ExpressionEvaluator(ctx);
- const printVal = ev.evaluateValue(step.print);
- emitEvent(ctx, RunEventType.StepPrint, step.id, { value: printVal });
- }
- emitEvent(ctx, RunEventType.StepDone, step.id, {
- outputs: collectStepOutputs(ctx, step)
- });
- if (ctx._completedSteps) ctx._completedSteps.push(step.id);
- this._emitCheckpoint(ctx);
- // Execute children (parallel branches)
- if (step.children && step.children.length > 0 && stepType !== 'Loop' && stepType !== 'Branch') {
- await this._executeChildren(ctx, step.children, step.id);
- }
- // Move to next
- return this._moveToNext(ctx, step);
- }
- /** Emit checkpoint after step completion (if callback registered). */
- _emitCheckpoint(ctx) {
- if (this._onCheckpoint) {
- try { this._onCheckpoint(ctx.checkpoint()); } catch {}
- }
- }
- // ─── Internal ──────────────────────────────────────────────
- async _moveToNext(ctx, step) {
- if (!step.next || step.next === 'RETURN' || step.next === 'STOP') return;
- if (step.next === 'BREAK') throw new LoopBreakSignal();
- if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return;
- const nextStep = this.getStep(step.next);
- if (nextStep) await this.executeStep(ctx, nextStep);
- }
- async _executeChildren(ctx, childIDs, parentStepID = null) {
- if (!childIDs || childIDs.length === 0) return;
- // Phase 2: filter out already-completed branches (for resume)
- const completedSet = parentStepID ? ctx._completedBranches.get(parentStepID) : null;
- const pendingIDs = completedSet
- ? childIDs.filter(id => !completedSet.has(id))
- : childIDs;
- if (pendingIDs.length === 0) return;
- if (pendingIDs.length === 1) {
- const child = this.getStep(pendingIDs[0]);
- if (child) {
- await this.executeStep(ctx, child);
- // Track completion
- if (parentStepID) this._markBranchCompleted(ctx, parentStepID, pendingIDs[0]);
- }
- return;
- }
- // Parallel execution
- const engine = this;
- const branches = pendingIDs.map(id => ({
- id,
- fn: async (failFastSignal) => {
- const childCtx = new ChildExecutionContext(ctx);
- if (failFastSignal) childCtx._failFastSignal = failFastSignal;
- // Copy parent local vars to child
- for (const key of ['_item', '_index', '_result', '_meta', '_error']) {
- if (ctx.localVars?.[key] !== undefined) childCtx.localVars[key] = ctx.localVars[key];
- else if (ctx.getVariable?.(key) !== undefined) childCtx.localVars[key] = ctx.getVariable(key);
- }
- const step = engine.getStep(id);
- if (step) await engine.executeStep(childCtx, step);
- // Track branch completion
- if (parentStepID) engine._markBranchCompleted(ctx, parentStepID, id);
- }
- }));
- await this.parallelExecutor.execute(branches, this.errorStrategy, ctx.signal);
- }
- _markBranchCompleted(ctx, parentStepID, childID) {
- if (!ctx._completedBranches.has(parentStepID)) {
- ctx._completedBranches.set(parentStepID, new Set());
- }
- ctx._completedBranches.get(parentStepID).add(childID);
- this._emitCheckpoint(ctx);
- }
- _findEntryNodeIDs() {
- const referenced = new Set();
- for (const step of this.workflow.steps) {
- if (step.next && step.next !== 'RETURN' && step.next !== 'STOP' && step.next !== 'BREAK') referenced.add(step.next);
- for (const child of (step.children || [])) referenced.add(child);
- for (const cas of (step.cases || [])) {
- if (Array.isArray(cas) && cas.length >= 2) referenced.add(cas[1]);
- if (cas && typeof cas === 'object' && !Array.isArray(cas)) {
- const ref = cas.next || cas.step;
- if (ref) referenced.add(ref);
- }
- }
- if (step.default) referenced.add(step.default);
- if (step.if_true) referenced.add(step.if_true);
- if (step.if_false) referenced.add(step.if_false);
- if (step.onError) referenced.add(step.onError);
- if (step.timeout?.on) referenced.add(step.timeout.on);
- }
- return this.workflow.steps.map(s => s.id).filter(id => !referenced.has(id));
- }
- _collectFileTargets(step) {
- const targets = [];
- if (step.target && typeof step.target === 'string' && getStepType(step.id) === 'Write') {
- targets.push(step.target);
- }
- if (step.out) {
- for (const target of Object.keys(step.out)) {
- if (target.startsWith('/')) targets.push(target);
- }
- }
- return targets;
- }
- }
- // ─── Event Payload Helpers ────────────────────────────────────
- /**
- * Truncate large values to prevent oversized event payloads.
- * If a single value serializes to > maxBytes, replace with a summary.
- */
- function summarizeIfLarge(value, maxBytes = 10240) {
- if (value === null || value === undefined) return value;
- if (typeof value !== 'object' && typeof value !== 'string') return value;
- let json;
- try { json = JSON.stringify(value); } catch { return { _truncated: true, type: typeof value, error: 'non-serializable' }; }
- if (json.length <= maxBytes) return value;
- return { _truncated: true, type: typeof value, length: json.length, preview: json.slice(0, 200) + '...' };
- }
- /** Apply summarizeIfLarge to every value in a shallow object. */
- function summarizeDeep(obj, maxBytes = 10240) {
- if (!obj || typeof obj !== 'object') return summarizeIfLarge(obj, maxBytes);
- const out = {};
- for (const [k, v] of Object.entries(obj)) {
- out[k] = summarizeIfLarge(v, maxBytes);
- }
- return out;
- }
- /**
- * Collect outputs after applyOutputMapping by reading back the targets from ctx.
- * For $variable targets, read the value; for /file/ targets, return metadata only.
- */
- function collectStepOutputs(ctx, step) {
- if (!step.out) return null;
- const outputs = {};
- // Shorthand: out is a string like "$plan"
- if (typeof step.out === 'string') {
- outputs[step.out] = summarizeIfLarge(ctx.getVariable(step.out));
- return outputs;
- }
- for (const target of Object.keys(step.out)) {
- if (target.startsWith('/')) {
- // File target — only metadata
- outputs[target] = { _file: true, path: target };
- } else if (target.startsWith('$') || target.startsWith('_')) {
- outputs[target] = summarizeIfLarge(ctx.getVariable(target));
- }
- }
- return outputs;
- }
- // ─── Helpers ─────────────────────────────────────────────────
- function applyParamType(type, value) {
- if (type === 'OBJECT' && typeof value === 'string') {
- try { return JSON.parse(value); } catch { return value; }
- }
- if (type === 'STRING' && typeof value !== 'string') {
- return typeof value === 'object' ? JSON.stringify(value) : String(value);
- }
- return value;
- }
- function applyVarType(type, value) {
- if (type === 'OBJECT' && typeof value === 'string') {
- try { return JSON.parse(value); } catch { return value; }
- }
- if (type?.startsWith('[') && typeof value === 'string') {
- try { return JSON.parse(value); } catch { return value; }
- }
- return value;
- }
- module.exports = { Engine };
|