package workflow import ( "context" "fmt" "runtime" "strings" "sync" "sync/atomic" ) // ParallelErrorStrategy defines how to handle errors in parallel execution type ParallelErrorStrategy string const ( // ParallelErrorStrategyFailFast stops all parallel work on first error ParallelErrorStrategyFailFast ParallelErrorStrategy = "failFast" // ParallelErrorStrategyCollectAll continues all parallel work, collects all errors ParallelErrorStrategyCollectAll ParallelErrorStrategy = "collectAll" // ParallelErrorStrategyPartialSuccess continues all work, succeeds if any succeed ParallelErrorStrategyPartialSuccess ParallelErrorStrategy = "partialSuccess" ) // BranchError represents an error from a single parallel branch type BranchError struct { BranchID string // Child step ID or iteration index BranchIndex int // Numeric index for ordering Error error // The actual error } // ParallelError aggregates errors from parallel executions type ParallelError struct { Errors []BranchError Strategy ParallelErrorStrategy TotalBranches int SuccessCount int } // Error implements the error interface func (e *ParallelError) Error() string { failureCount := len(e.Errors) if failureCount == 0 { return "no errors" } if failureCount == 1 { return fmt.Sprintf("parallel execution failed: %s: %v", e.Errors[0].BranchID, e.Errors[0].Error) } var sb strings.Builder sb.WriteString(fmt.Sprintf("%d/%d parallel branches failed:\n", failureCount, e.TotalBranches)) for i, branchErr := range e.Errors { if i >= 5 { sb.WriteString(fmt.Sprintf(" ... and %d more errors\n", failureCount-5)) break } sb.WriteString(fmt.Sprintf(" - %s: %v\n", branchErr.BranchID, branchErr.Error)) } return sb.String() } // ParallelBranch represents a single parallel execution branch type ParallelBranch struct { ID string // Branch identifier Fn func(context.Context) error // Function to execute } // ParallelCoordinator manages parallel execution with cancellation and error handling type ParallelCoordinator struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup errorStrategy ParallelErrorStrategy errorMutex sync.Mutex errors []BranchError totalBranches int successCount int32 // atomic } // NewParallelCoordinator creates a new parallel execution coordinator func NewParallelCoordinator( parentCtx context.Context, strategy ParallelErrorStrategy, totalBranches int, ) *ParallelCoordinator { ctx, cancel := context.WithCancel(parentCtx) return &ParallelCoordinator{ ctx: ctx, cancel: cancel, errorStrategy: strategy, errors: make([]BranchError, 0), totalBranches: totalBranches, successCount: 0, } } // ExecuteBranch executes a single branch in a goroutine func (pc *ParallelCoordinator) ExecuteBranch( branchID string, branchIndex int, fn func(ctx context.Context) error, ) { pc.wg.Add(1) go func() { defer pc.wg.Done() // Check if already cancelled before starting select { case <-pc.ctx.Done(): return default: } // Execute branch function err := fn(pc.ctx) if err != nil { pc.recordError(BranchError{ BranchID: branchID, BranchIndex: branchIndex, Error: err, }) // Cancel other branches if fail-fast if pc.errorStrategy == ParallelErrorStrategyFailFast { pc.cancel() } } else { // Increment success count atomic.AddInt32(&pc.successCount, 1) } }() } // Wait waits for all branches to complete and returns aggregated error func (pc *ParallelCoordinator) Wait() error { pc.wg.Wait() pc.cancel() // Clean up context successCount := int(atomic.LoadInt32(&pc.successCount)) // Handle different error strategies if len(pc.errors) == 0 { return nil } // For partial success strategy, succeed if at least one branch succeeded if pc.errorStrategy == ParallelErrorStrategyPartialSuccess && successCount > 0 { return nil } // Return aggregated error return &ParallelError{ Errors: pc.errors, Strategy: pc.errorStrategy, TotalBranches: pc.totalBranches, SuccessCount: successCount, } } // recordError records an error from a branch (thread-safe) func (pc *ParallelCoordinator) recordError(err BranchError) { pc.errorMutex.Lock() defer pc.errorMutex.Unlock() pc.errors = append(pc.errors, err) } // ParallelExecutor manages parallel execution with resource limits type ParallelExecutor struct { maxConcurrency int semaphore chan struct{} } // NewParallelExecutor creates a new parallel executor with concurrency limit func NewParallelExecutor(maxConcurrency int) *ParallelExecutor { if maxConcurrency <= 0 { maxConcurrency = runtime.NumCPU() * 2 // Sensible default } return &ParallelExecutor{ maxConcurrency: maxConcurrency, semaphore: make(chan struct{}, maxConcurrency), } } // Execute executes branches in parallel with concurrency limit func (pe *ParallelExecutor) Execute( ctx context.Context, branches []ParallelBranch, strategy ParallelErrorStrategy, ) error { if len(branches) == 0 { return nil } coordinator := NewParallelCoordinator(ctx, strategy, len(branches)) for i, branch := range branches { branchID := branch.ID branchIndex := i branchFn := branch.Fn // Acquire semaphore (blocks if at max concurrency) select { case pe.semaphore <- struct{}{}: // Got token, proceed case <-ctx.Done(): // Context cancelled while waiting coordinator.cancel() return ctx.Err() } // Execute branch with semaphore release coordinator.ExecuteBranch(branchID, branchIndex, func(ctx context.Context) error { defer func() { <-pe.semaphore // Release semaphore }() return branchFn(ctx) }) } return coordinator.Wait() } // EngineOptions configures the workflow engine type EngineOptions struct { MaxConcurrency int // Max concurrent goroutines (default: runtime.NumCPU() * 2) ParallelErrorStrategy ParallelErrorStrategy // Error handling strategy (default: fail-fast) EventBufferSize int // Event stream buffer size (default: 1000) } // DefaultEngineOptions provides sensible defaults var DefaultEngineOptions = EngineOptions{ MaxConcurrency: runtime.NumCPU() * 2, ParallelErrorStrategy: ParallelErrorStrategyFailFast, EventBufferSize: 1000, }