| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- /**
- * VL Workflow Engine — Parallel Executor
- * Ported from Go: workflow/parallel.go
- * Spec: v3.15
- *
- * Strategies:
- * failFast — cancel all on first error
- * collectAll — run all, collect all errors
- * partialSuccess — run all, succeed if any succeed
- */
- const { ParallelErrorStrategy } = require('./types');
- const os = require('os');
- class ParallelError extends Error {
- constructor(errors, strategy, totalBranches, successCount) {
- const msgs = errors.map(e => `[${e.branchID}] ${e.error.message}`);
- super(`Parallel execution failed (${strategy}): ${msgs.join('; ')}`);
- this.name = 'ParallelError';
- this.errors = errors; // [{branchID, branchIndex, error}]
- this.strategy = strategy;
- this.totalBranches = totalBranches;
- this.successCount = successCount;
- }
- }
- class ParallelExecutor {
- constructor(maxConcurrency = 0) {
- this.maxConcurrency = maxConcurrency > 0 ? maxConcurrency : os.cpus().length * 2;
- }
- /**
- * Execute branches in parallel with error strategy.
- * @param {Array<{id: string, fn: () => Promise<void>}>} branches
- * @param {string} strategy
- * @param {AbortSignal} signal — optional abort signal
- * @returns {Promise<void>}
- */
- async execute(branches, strategy = ParallelErrorStrategy.FailFast, signal = null) {
- if (branches.length === 0) return;
- if (branches.length === 1) {
- // Optimization: single branch, run directly
- await branches[0].fn();
- return;
- }
- const errors = [];
- let successCount = 0;
- let abortController = null;
- // For failFast: create an internal abort controller
- if (strategy === ParallelErrorStrategy.FailFast) {
- abortController = new AbortController();
- }
- // Semaphore for concurrency limiting
- let running = 0;
- const queue = [];
- const acquire = () => new Promise(resolve => {
- if (running < this.maxConcurrency) { running++; resolve(); }
- else queue.push(resolve);
- });
- const release = () => {
- running--;
- if (queue.length > 0) { running++; queue.shift()(); }
- };
- const promises = branches.map(async (branch, index) => {
- await acquire();
- try {
- // Check abort before starting
- if (signal?.aborted) return;
- if (abortController?.signal.aborted) return;
- await branch.fn(abortController?.signal);
- successCount++;
- } catch (err) {
- errors.push({ branchID: branch.id, branchIndex: index, error: err });
- if (strategy === ParallelErrorStrategy.FailFast && abortController) {
- abortController.abort();
- }
- } finally {
- release();
- }
- });
- await Promise.allSettled(promises);
- // Evaluate result based on strategy
- if (errors.length === 0) return;
- if (strategy === ParallelErrorStrategy.PartialSuccess && successCount > 0) {
- return; // At least one succeeded
- }
- throw new ParallelError(errors, strategy, branches.length, successCount);
- }
- }
- module.exports = { ParallelExecutor, ParallelError };
|