/** * VL Workflow Engine — Parallel Executor * Ported from Go: workflow/parallel.go * Spec: v3.15 * * Strategies: * failFast — cancel all on first error * collectAll — run all, collect all errors * partialSuccess — run all, succeed if any succeed */ const { ParallelErrorStrategy } = require('./types'); const os = require('os'); class ParallelError extends Error { constructor(errors, strategy, totalBranches, successCount) { const msgs = errors.map(e => `[${e.branchID}] ${e.error.message}`); super(`Parallel execution failed (${strategy}): ${msgs.join('; ')}`); this.name = 'ParallelError'; this.errors = errors; // [{branchID, branchIndex, error}] this.strategy = strategy; this.totalBranches = totalBranches; this.successCount = successCount; } } class ParallelExecutor { constructor(maxConcurrency = 0) { this.maxConcurrency = maxConcurrency > 0 ? maxConcurrency : os.cpus().length * 2; } /** * Execute branches in parallel with error strategy. * @param {Array<{id: string, fn: () => Promise}>} branches * @param {string} strategy * @param {AbortSignal} signal — optional abort signal * @returns {Promise} */ async execute(branches, strategy = ParallelErrorStrategy.FailFast, signal = null) { if (branches.length === 0) return; if (branches.length === 1) { // Optimization: single branch, run directly await branches[0].fn(); return; } const errors = []; let successCount = 0; let abortController = null; // For failFast: create an internal abort controller if (strategy === ParallelErrorStrategy.FailFast) { abortController = new AbortController(); } // Semaphore for concurrency limiting let running = 0; const queue = []; const acquire = () => new Promise(resolve => { if (running < this.maxConcurrency) { running++; resolve(); } else queue.push(resolve); }); const release = () => { running--; if (queue.length > 0) { running++; queue.shift()(); } }; const promises = branches.map(async (branch, index) => { await acquire(); try { // Check abort before starting if (signal?.aborted) return; if (abortController?.signal.aborted) return; await branch.fn(abortController?.signal); successCount++; } catch (err) { errors.push({ branchID: branch.id, branchIndex: index, error: err }); if (strategy === ParallelErrorStrategy.FailFast && abortController) { abortController.abort(); } } finally { release(); } }); await Promise.allSettled(promises); // Evaluate result based on strategy if (errors.length === 0) return; if (strategy === ParallelErrorStrategy.PartialSuccess && successCount > 0) { return; // At least one succeeded } throw new ParallelError(errors, strategy, branches.length, successCount); } } module.exports = { ParallelExecutor, ParallelError };