# v2.3 Workflow Engine - Go Library A comprehensive Golang implementation of the v2.3 Workflow specification for orchestrating multi-step processes with support for pause/resume, parallel execution, and flexible control flow. ## Overview This library implements a stateless workflow orchestration engine that supports: - **Multi-step Output**: Real-time streaming of events and progress - **Pause/Resume**: Save workflow state and resume from any point - **Control Flow**: Branching, looping, parallel execution - **Flexible Execution**: Support for services, components, LLM calls, and custom logic - **Stateless Design**: All state is externalized for easy persistence - **Type Safety**: Strong typing with validation ## Recent Updates ### Latest - **Value Evaluation Rules**: New `=` prefix convention for distinguishing literal values from expressions in step inputs, outputs, and values. Strings without `=` are treated as literals; strings with `=` prefix are evaluated as expressions; `==` prefix escapes a literal `=`. - **LiteLLM BYOK Support**: New `RequestAPIKey` config option sends an `api_key` field in the JSON request body for Bring Your Own Key passthrough to downstream providers. - **Doc Adapter**: Inject semantic documents into LLM system prompts via `docs` parameter and registry declarations. ### v2.3 Release - **S3 File Adapter**: AWS S3 storage support for workflow files and artifacts (compatible with MinIO and other S3-compatible services) - **LiteLLM Adapter**: Integration with LiteLLM proxy for unified LLM API access across multiple providers - **Explicit Pause Steps**: New `Pause_*` step type for explicit workflow pausing (in addition to service-triggered pauses) - **Improved Completion Behavior**: Steps without `next` field are treated as completed (not paused), allowing child branches to complete naturally - **Bug Fixes**: - Fixed error type checking in adapters - URL-encode S3 CopySource for special characters - Increased SSE scanner buffer to 1MB for large LLM responses - **Documentation**: Added known limitations section for adapter implementations ## Architecture ### Core Components 1. **Engine** - Main workflow execution orchestrator 2. **Registry** - Declares external dependencies and boundaries 3. **Expression Evaluator** - Evaluates workflow expressions and variable paths 4. **Adapters** - Interfaces for external services, components, LLMs, and file operations 5. **Steps** - Various node types for different operations ## Installation ```bash go get -u workflow ``` ### Optional Dependencies For S3 file storage support: ```bash go get github.com/aws/aws-sdk-go-v2 go get github.com/aws/aws-sdk-go-v2/config go get github.com/aws/aws-sdk-go-v2/service/s3 ``` The LiteLLM adapter uses only standard library HTTP client (no additional dependencies required). ## Quick Start ### 1. Define a Workflow ```go package main import ( "context" "fmt" "workflow" ) func main() { // Define workflow structure wf := &workflow.Workflow{ Version: "2.3", Name: "Simple Task Workflow", Registry: workflow.Registry{ Services: []string{ "TaskService(input(STRING)) RETURN result(OBJECT)", }, Components: []string{}, Vars: []string{ "$input(STRING)", "$result(OBJECT)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Service_TaskService", In: workflow.StepInput{ "input": "=$input", }, Out: workflow.StepOutput{ "$result": "=_result.result", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } // Setup and execute... } ``` ### 2. Create Adapters ```go // Create service adapter serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("TaskService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { input := params["input"].(string) return &workflow.ServiceResult{ Data: map[string]interface{}{ "result": map[string]interface{}{ "status": "completed", "message": fmt.Sprintf("Processed: %s", input), }, }, }, nil }) // Create adapters bundle (using default adapters) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } // Or use specialized adapters (S3 for files, LiteLLM for LLM calls) // See "Built-in Adapters" section below for S3FileAdapter and LiteLLMAdapter ``` ### 3. Execute Workflow ```go // Create engine engine, err := workflow.NewEngine(wf) if err != nil { panic(err) } // Execute with initial variables initialVars := map[string]interface{}{ "$input": "Hello, Workflow!", } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { panic(err) } // Process events for event := range result.EventStream { fmt.Printf("Event: %s - %s\n", event.Type, event.StepID) } ``` ## Step Types ### Service Steps (Service_*) Call project services with input/output mapping. ```go { ID: "Service_MyService", In: workflow.StepInput{ "param1": "=$variable1", // expression: evaluates $variable1 "param2": "literal value", // literal string (no = prefix) "param3": true, // literal bool }, Out: workflow.StepOutput{ "$result": "=_result.data", }, Next: "NextStep", } ``` ### Component Steps (Component_*) Call system components (MCP, file operations, etc.). ```go { ID: "Component_FileOps", In: workflow.StepInput{ "operation": "read", "path": "=$filePath", }, Out: workflow.StepOutput{ "$fileContent": "=_result.content", }, Next: "NextStep", } ``` ### LLM Steps (LLM_*) Call large language models with streaming support. ```go { ID: "LLM_Generate", In: workflow.StepInput{ "model": "gpt-4", // literal string "stream": true, // literal bool "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "=$prompt", }, }, }, Out: workflow.StepOutput{ "$generatedText": "=_result.content", }, Next: "NextStep", } ``` ### Set Steps (Set_*) Update global variables. ```go { ID: "Set_UpdateStatus", Target: "$status", Value: "completed", // literal string (no = prefix needed) Next: "NextStep", } // To set a variable from an expression: { ID: "Set_Total", Target: "$total", Value: "=$price * $quantity", // expression (= prefix) Next: "NextStep", } ``` ### Write Steps (Write_*) Write files to artifacts directory. ```go { ID: "Write_Output", Target: "Process/Artifacts/output.txt", // literal path (no = prefix) Value: "=$content", // expression (= prefix) Mode: "overwrite", // or "append", "failIfExists" Next: "NextStep", } ``` ### Branch Steps (Branch_*) Conditional branching based on expressions. ```go { ID: "Branch_CheckAmount", Cases: [][]string{ {"$amount < 1000", "Service_AutoApprove"}, {"$amount < 10000", "Service_ManagerApprove"}, {"ELSE", "Service_DirectorApprove"}, }, Next: "Stop_End", } ``` ### Loop Steps (Loop_*) Iterate over collections with serial or parallel execution. ```go { ID: "Loop_ProcessItems", Mode: "parallel", // or "serial" Source: "$items", Children: []string{"Service_ProcessOne"}, Next: "Stop_End", } ``` ### Stop Steps (Stop_*) Explicitly terminate workflow execution. ```go { ID: "Stop_End", } ``` ### Pause Steps (Pause_*) Explicitly pause workflow execution, waiting for external resume. ```go { ID: "Pause_WaitApproval", } ``` **Note:** Workflows only pause when: 1. Reaching a `Pause_*` step 2. A Service returns `StatusPatch.Control.Pause = true` When a step has no `next`, the branch is considered **completed** (not paused). Child branches ending without a `next` are considered complete, allowing the parent to continue. ## Features ### Value Evaluation Rules Step input values (`In`, `Out`, `Value`, `Target` for Write steps) follow these rules: | Value | Treatment | Result | |-------|-----------|--------| | Non-string JSON types (bool, number, null, object, array) | Literal | Returned as-is | | String without `=` prefix | Literal string | `"hello"` → `"hello"` | | String with `=` prefix | Expression | `"=$name"` → evaluates `$name` | | String with `==` prefix | Escaped literal | `"==foo"` → `"=foo"` | ```go // Literal values (no = prefix) "hello world" // literal string "hello world" "Process/output.txt" // literal string path // Expression values (= prefix) "=$variable" // evaluates variable reference "=$object.field" // evaluates nested path "=$array[0]" // evaluates array index "=$age + 5" // evaluates arithmetic "=$status == \"done\"" // evaluates comparison // Escaped literal (== prefix, strips one =) "==something" // literal string "=something" // Non-string types are always literal true // literal bool 42 // literal number map[string]interface{}{...} // literal object ``` **Note:** `If` conditions, `Source` expressions, and `Cases` expressions are always evaluated as expressions directly (no `=` prefix needed). ### Expression Language The expression language (used after the `=` prefix, or directly in `If`/`Source`/`Cases`) supports: ```go // Variable references "$variable" "$object.field" "$array[0]" "$array[_index]" // Comparisons "$amount > 1000" "$status == \"approved\"" "$count >= 10" // Logical operators "$approved == true && $amount < 5000" "$error != null || $retry > 3" // Arithmetic "$total + 100" "$price * $quantity" "\"Result: \" + $value" // Literals (within expressions) "\"string literal\"" "42" "true" "false" ``` ### Variables Three types of variables: 1. **Global Variables** (`$xxx`): Declared in registry, accessible throughout workflow 2. **System Variables** (`SYSVAR.xxx`): Read-only configuration from system 3. **Local Variables** (`_item`, `_index`, `_result`): Loop and step-scoped variables ### Conditional Execution Use the `if` property to conditionally execute steps: ```go { ID: "Service_Notify", If: "$shouldNotify == true", // If is always an expression (no = prefix) In: workflow.StepInput{ "message": "=$notificationText", // In values use = prefix for expressions }, Next: "Stop_End", } ``` ### Pause and Resume Workflows can pause for external input (e.g., human approval). Pausing requires explicit action: **Option 1: Use a Pause_* step** ```go { ID: "Pause_WaitApproval", } ``` **Option 2: Service can trigger pause via StatusPatch** ```go return &workflow.ServiceResult{ Data: map[string]interface{}{ "status": "pending", }, StatusPatch: &workflow.StatusPatch{ Vars: map[string]interface{}{ "$approvalStatus": "pending", }, Control: workflow.ControlDirective{ Pause: true, Reason: "WAIT_APPROVAL", }, }, } ``` **Resume later with updated state:** ```go resumeInput := &workflow.ResumeInput{ State: pauseState, // From paused event Patch: map[string]interface{}{ "$approved": true, "$approvalStatus": "approved", }, } result, err := engine.Resume(context.Background(), resumeInput, adapters) ``` **Important:** Steps without a `next` field are considered **completed**, not paused. This allows child branches to complete naturally without pausing the entire workflow. ### Event Stream Real-time workflow events: ```go for event := range result.EventStream { switch event.Type { case workflow.EventStepStart: fmt.Printf("Step started: %s\n", event.StepID) case workflow.EventStepComplete: fmt.Printf("Step completed: %s\n", event.StepID) case workflow.EventVariableUpdate: fmt.Printf("Variable updated: %v\n", event.Data) case workflow.EventPaused: state := event.Data["state"].(workflow.PauseState) // Save state for later resume case workflow.EventLLMStream: chunk := event.Data["chunk"].(string) fmt.Print(chunk) } } ``` ## Registry The registry declares all external dependencies: ```go Registry: workflow.Registry{ // Service signatures (VL format) Services: []string{ "ServiceName(param1(TYPE1), param2(TYPE2)) RETURN result1(TYPE1), result2(TYPE2)", }, // Component IDs Components: []string{"FileOps", "MCP_Search"}, // Global variables (VL format) Vars: []string{ "$varName(TYPE)", "$items([OBJECT])", }, // File access boundaries Files: workflow.FilesRegistry{ Inputs: []string{"Process/Input/*"}, // Read-only Artifacts: []string{"Process/Artifacts/*"}, // Writable }, } ``` ## Built-in Adapters ### S3 File Adapter For AWS S3 storage (or S3-compatible services like MinIO): ```go import ( "context" "workflow" ) // Create S3 file adapter s3Adapter, err := workflow.NewS3FileAdapter(ctx, workflow.S3Config{ Bucket: "my-workflow-bucket", Prefix: "workflows/", // Optional key prefix Region: "us-east-1", // Optional: Custom endpoint for MinIO/LocalStack // Endpoint: "http://localhost:9000", // UsePathStyle: true, // Optional: Explicit credentials (otherwise uses AWS credential chain) // AccessKeyID: "access-key", // SecretAccessKey: "secret-key", }) if err != nil { panic(err) } adapters := &workflow.Adapters{ Service: serviceAdapter, Component: componentAdapter, LLM: llmAdapter, File: s3Adapter, } ``` ### LiteLLM Adapter For LLM calls through a LiteLLM proxy server: ```go // Create LiteLLM adapter liteLLMAdapter := workflow.NewLiteLLMAdapter(workflow.LiteLLMConfig{ BaseURL: "http://localhost:4000", APIKey: "your-api-key", // Optional: sent in Authorization header RequestAPIKey: "user-provider-key", // Optional: sent as "api_key" in request body (BYOK) Timeout: 5 * time.Minute, // Optional (default: 5 minutes) }) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: componentAdapter, LLM: liteLLMAdapter, File: fileAdapter, } ``` **Configuration options:** | Field | Description | |-------|-------------| | `BaseURL` | LiteLLM server URL (default: `http://localhost:4000`) | | `APIKey` | Sent in `Authorization: Bearer ` header to authenticate with the LiteLLM proxy | | `RequestAPIKey` | Sent as `api_key` in the JSON request body for BYOK (Bring Your Own Key) passthrough to downstream providers | | `Model` | Optional model override (overwrites model in all LLM requests) | | `CacheControl` | Optional `cache_control` override for Anthropic models | | `Timeout` | HTTP client timeout (default: 5 minutes) | ## Custom Adapters Implement custom adapters for your infrastructure: ```go type MyServiceAdapter struct { client *http.Client } func (a *MyServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]interface{}) (*workflow.ServiceResult, error) { // Your custom service call logic resp, err := a.client.Post( "https://api.example.com/"+serviceName, "application/json", paramsToJSON(params), ) // ... handle response return &workflow.ServiceResult{ Data: resultData, }, nil } ``` ## Testing Run the examples: ```bash go test -v ./... ``` Example tests demonstrate: - Simple workflows - Loop execution (serial and parallel) - Pause/resume patterns - JSON workflow loading - Branch conditions ## Best Practices 1. **Declare before use**: All services, components, and variables must be declared in the registry 2. **Use descriptive IDs**: Prefix step IDs with type (Service_, LLM_, etc.) 3. **Validate early**: Call `workflow.Validate()` before execution 4. **Handle events**: Process the event stream to track progress 5. **Externalize state**: Save pause states for crash recovery 6. **Test adapters**: Implement proper error handling in custom adapters ## Advanced Topics ### Parallel Execution Children are executed in parallel: ```go { ID: "Service_Parent", Children: []string{"Service_A", "Service_B", "Service_C"}, Next: "Service_AfterJoin", // Waits for all children to complete } ``` ### Loop Variables Access loop context with `_item` and `_index`: ```go { ID: "Set_ProcessItem", Target: "$results[_index]", Value: "=_item.value", } ``` ### Deep Path Access Navigate nested structures: ```go "$user.profile.email" "$items[0].name" "$data[_index].field" ``` ### Status Patch Protocol Services can update workflow state: ```go StatusPatch: &workflow.StatusPatch{ Vars: map[string]interface{}{ "$approval.status": "approved", "$approval.timestamp": time.Now(), }, Control: workflow.ControlDirective{ Pause: false, }, } ``` ## API Reference ### Core Types - `Workflow` - Top-level workflow definition - `Registry` - External dependencies declaration - `Step` - Individual workflow step - `ExecutionContext` - Runtime state - `Engine` - Workflow executor ### Execution Status - `StatusRunning` - Workflow is currently executing - `StatusCompleted` - Workflow completed (reached end naturally) - `StatusPaused` - Workflow paused (via Pause_* step or StatusPatch) - `StatusStopped` - Workflow stopped (via Stop_* step) - `StatusFailed` - Workflow failed due to error ### Adapters - `ServiceAdapter` - Service call interface - `ComponentAdapter` - Component call interface - `LLMAdapter` - LLM call interface - `FileAdapter` - File operation interface ### Events - `EventStepStart` - Step execution started - `EventStepComplete` - Step execution completed - `EventStepError` - Step execution failed - `EventVariableUpdate` - Variable value changed - `EventFileWrite` - File written to artifacts - `EventPaused` - Workflow paused (via Pause_* step or StatusPatch) - `EventResumed` - Workflow resumed - `EventCompleted` - Workflow completed (reached end or Stop_* step) - `EventFailed` - Workflow failed due to error - `EventLLMStream` - LLM streaming chunk ## Known Limitations ### S3 File Adapter - **Non-atomic `WriteModeFailIfExists`**: The check-then-write pattern for `WriteModeFailIfExists` is not atomic. Another process could create the file between the `Exists` check and the `PutObject` call. S3 doesn't support conditional writes natively. Workarounds would require S3 Object Lock or versioning, which changes API semantics. - **No retry logic**: The adapter doesn't implement retry with exponential backoff for transient S3 failures (network issues, throttling). Consider wrapping calls with your own retry logic for production use. ### LiteLLM Adapter - **No retry logic**: The adapter doesn't implement retry for transient HTTP failures. For production use, consider implementing retry with exponential backoff. - **Single HTTP client timeout**: The HTTP client timeout applies to the entire request. For long-running streaming responses, individual chunk reads don't have separate timeouts. ## License This library is provided as-is for workflow orchestration purposes. ## Contributing The library follows the v2.3 Workflow specification. For spec updates or clarifications, refer to the original specification document. ## Support For issues, questions, or contributions, please refer to the project repository.