parallel.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. /**
  2. * VL Workflow Engine — Parallel Executor
  3. * Ported from Go: workflow/parallel.go
  4. * Spec: v3.15
  5. *
  6. * Strategies:
  7. * failFast — cancel all on first error
  8. * collectAll — run all, collect all errors
  9. * partialSuccess — run all, succeed if any succeed
  10. */
  11. const { ParallelErrorStrategy } = require('./types');
  12. const os = require('os');
  13. class ParallelError extends Error {
  14. constructor(errors, strategy, totalBranches, successCount) {
  15. const msgs = errors.map(e => `[${e.branchID}] ${e.error.message}`);
  16. super(`Parallel execution failed (${strategy}): ${msgs.join('; ')}`);
  17. this.name = 'ParallelError';
  18. this.errors = errors; // [{branchID, branchIndex, error}]
  19. this.strategy = strategy;
  20. this.totalBranches = totalBranches;
  21. this.successCount = successCount;
  22. }
  23. }
  24. class ParallelExecutor {
  25. constructor(maxConcurrency = 0) {
  26. this.maxConcurrency = maxConcurrency > 0 ? maxConcurrency : os.cpus().length * 2;
  27. }
  28. /**
  29. * Execute branches in parallel with error strategy.
  30. * @param {Array<{id: string, fn: () => Promise<void>}>} branches
  31. * @param {string} strategy
  32. * @param {AbortSignal} signal — optional abort signal
  33. * @returns {Promise<void>}
  34. */
  35. async execute(branches, strategy = ParallelErrorStrategy.FailFast, signal = null) {
  36. if (branches.length === 0) return;
  37. if (branches.length === 1) {
  38. // Optimization: single branch, run directly
  39. await branches[0].fn();
  40. return;
  41. }
  42. const errors = [];
  43. let successCount = 0;
  44. let abortController = null;
  45. // For failFast: create an internal abort controller
  46. if (strategy === ParallelErrorStrategy.FailFast) {
  47. abortController = new AbortController();
  48. }
  49. // Semaphore for concurrency limiting
  50. let running = 0;
  51. const queue = [];
  52. const acquire = () => new Promise(resolve => {
  53. if (running < this.maxConcurrency) { running++; resolve(); }
  54. else queue.push(resolve);
  55. });
  56. const release = () => {
  57. running--;
  58. if (queue.length > 0) { running++; queue.shift()(); }
  59. };
  60. const promises = branches.map(async (branch, index) => {
  61. await acquire();
  62. try {
  63. // Check abort before starting
  64. if (signal?.aborted) return;
  65. if (abortController?.signal.aborted) return;
  66. await branch.fn(abortController?.signal);
  67. successCount++;
  68. } catch (err) {
  69. errors.push({ branchID: branch.id, branchIndex: index, error: err });
  70. if (strategy === ParallelErrorStrategy.FailFast && abortController) {
  71. abortController.abort();
  72. }
  73. } finally {
  74. release();
  75. }
  76. });
  77. await Promise.allSettled(promises);
  78. // Evaluate result based on strategy
  79. if (errors.length === 0) return;
  80. if (strategy === ParallelErrorStrategy.PartialSuccess && successCount > 0) {
  81. return; // At least one succeeded
  82. }
  83. throw new ParallelError(errors, strategy, branches.length, successCount);
  84. }
  85. }
  86. module.exports = { ParallelExecutor, ParallelError };