| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- package workflow
- import (
- "context"
- "fmt"
- "runtime"
- "strings"
- "sync"
- "sync/atomic"
- )
- // ParallelErrorStrategy defines how to handle errors in parallel execution
- type ParallelErrorStrategy string
- const (
- // ParallelErrorStrategyFailFast stops all parallel work on first error
- ParallelErrorStrategyFailFast ParallelErrorStrategy = "failFast"
- // ParallelErrorStrategyCollectAll continues all parallel work, collects all errors
- ParallelErrorStrategyCollectAll ParallelErrorStrategy = "collectAll"
- // ParallelErrorStrategyPartialSuccess continues all work, succeeds if any succeed
- ParallelErrorStrategyPartialSuccess ParallelErrorStrategy = "partialSuccess"
- )
- // BranchError represents an error from a single parallel branch
- type BranchError struct {
- BranchID string // Child step ID or iteration index
- BranchIndex int // Numeric index for ordering
- Error error // The actual error
- }
- // ParallelError aggregates errors from parallel executions
- type ParallelError struct {
- Errors []BranchError
- Strategy ParallelErrorStrategy
- TotalBranches int
- SuccessCount int
- }
- // Error implements the error interface
- func (e *ParallelError) Error() string {
- failureCount := len(e.Errors)
- if failureCount == 0 {
- return "no errors"
- }
- if failureCount == 1 {
- return fmt.Sprintf("parallel execution failed: %s: %v",
- e.Errors[0].BranchID, e.Errors[0].Error)
- }
- var sb strings.Builder
- sb.WriteString(fmt.Sprintf("%d/%d parallel branches failed:\n",
- failureCount, e.TotalBranches))
- for i, branchErr := range e.Errors {
- if i >= 5 {
- sb.WriteString(fmt.Sprintf(" ... and %d more errors\n", failureCount-5))
- break
- }
- sb.WriteString(fmt.Sprintf(" - %s: %v\n", branchErr.BranchID, branchErr.Error))
- }
- return sb.String()
- }
- // ParallelBranch represents a single parallel execution branch
- type ParallelBranch struct {
- ID string // Branch identifier
- Fn func(context.Context) error // Function to execute
- }
- // ParallelCoordinator manages parallel execution with cancellation and error handling
- type ParallelCoordinator struct {
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- errorStrategy ParallelErrorStrategy
- errorMutex sync.Mutex
- errors []BranchError
- totalBranches int
- successCount int32 // atomic
- }
- // NewParallelCoordinator creates a new parallel execution coordinator
- func NewParallelCoordinator(
- parentCtx context.Context,
- strategy ParallelErrorStrategy,
- totalBranches int,
- ) *ParallelCoordinator {
- ctx, cancel := context.WithCancel(parentCtx)
- return &ParallelCoordinator{
- ctx: ctx,
- cancel: cancel,
- errorStrategy: strategy,
- errors: make([]BranchError, 0),
- totalBranches: totalBranches,
- successCount: 0,
- }
- }
- // ExecuteBranch executes a single branch in a goroutine
- func (pc *ParallelCoordinator) ExecuteBranch(
- branchID string,
- branchIndex int,
- fn func(ctx context.Context) error,
- ) {
- pc.wg.Add(1)
- go func() {
- defer pc.wg.Done()
- // Check if already cancelled before starting
- select {
- case <-pc.ctx.Done():
- return
- default:
- }
- // Execute branch function
- err := fn(pc.ctx)
- if err != nil {
- pc.recordError(BranchError{
- BranchID: branchID,
- BranchIndex: branchIndex,
- Error: err,
- })
- // Cancel other branches if fail-fast
- if pc.errorStrategy == ParallelErrorStrategyFailFast {
- pc.cancel()
- }
- } else {
- // Increment success count
- atomic.AddInt32(&pc.successCount, 1)
- }
- }()
- }
- // Wait waits for all branches to complete and returns aggregated error
- func (pc *ParallelCoordinator) Wait() error {
- pc.wg.Wait()
- pc.cancel() // Clean up context
- successCount := int(atomic.LoadInt32(&pc.successCount))
- // Handle different error strategies
- if len(pc.errors) == 0 {
- return nil
- }
- // For partial success strategy, succeed if at least one branch succeeded
- if pc.errorStrategy == ParallelErrorStrategyPartialSuccess && successCount > 0 {
- return nil
- }
- // Return aggregated error
- return &ParallelError{
- Errors: pc.errors,
- Strategy: pc.errorStrategy,
- TotalBranches: pc.totalBranches,
- SuccessCount: successCount,
- }
- }
- // recordError records an error from a branch (thread-safe)
- func (pc *ParallelCoordinator) recordError(err BranchError) {
- pc.errorMutex.Lock()
- defer pc.errorMutex.Unlock()
- pc.errors = append(pc.errors, err)
- }
- // ParallelExecutor manages parallel execution with resource limits
- type ParallelExecutor struct {
- maxConcurrency int
- semaphore chan struct{}
- }
- // NewParallelExecutor creates a new parallel executor with concurrency limit
- func NewParallelExecutor(maxConcurrency int) *ParallelExecutor {
- if maxConcurrency <= 0 {
- maxConcurrency = runtime.NumCPU() * 2 // Sensible default
- }
- return &ParallelExecutor{
- maxConcurrency: maxConcurrency,
- semaphore: make(chan struct{}, maxConcurrency),
- }
- }
- // Execute executes branches in parallel with concurrency limit
- func (pe *ParallelExecutor) Execute(
- ctx context.Context,
- branches []ParallelBranch,
- strategy ParallelErrorStrategy,
- ) error {
- if len(branches) == 0 {
- return nil
- }
- coordinator := NewParallelCoordinator(ctx, strategy, len(branches))
- for i, branch := range branches {
- branchID := branch.ID
- branchIndex := i
- branchFn := branch.Fn
- // Acquire semaphore (blocks if at max concurrency)
- select {
- case pe.semaphore <- struct{}{}:
- // Got token, proceed
- case <-ctx.Done():
- // Context cancelled while waiting
- coordinator.cancel()
- return ctx.Err()
- }
- // Execute branch with semaphore release
- coordinator.ExecuteBranch(branchID, branchIndex, func(ctx context.Context) error {
- defer func() {
- <-pe.semaphore // Release semaphore
- }()
- return branchFn(ctx)
- })
- }
- return coordinator.Wait()
- }
- // EngineOptions configures the workflow engine
- type EngineOptions struct {
- MaxConcurrency int // Max concurrent goroutines (default: runtime.NumCPU() * 2)
- ParallelErrorStrategy ParallelErrorStrategy // Error handling strategy (default: fail-fast)
- EventBufferSize int // Event stream buffer size (default: 1000)
- }
- // DefaultEngineOptions provides sensible defaults
- var DefaultEngineOptions = EngineOptions{
- MaxConcurrency: runtime.NumCPU() * 2,
- ParallelErrorStrategy: ParallelErrorStrategyFailFast,
- EventBufferSize: 1000,
- }
|