LIBRARY_README.md 20 KB

v2.3 Workflow Engine - Go Library

A comprehensive Golang implementation of the v2.3 Workflow specification for orchestrating multi-step processes with support for pause/resume, parallel execution, and flexible control flow.

Overview

This library implements a stateless workflow orchestration engine that supports:

  • Multi-step Output: Real-time streaming of events and progress
  • Pause/Resume: Save workflow state and resume from any point
  • Control Flow: Branching, looping, parallel execution
  • Flexible Execution: Support for services, components, LLM calls, and custom logic
  • Stateless Design: All state is externalized for easy persistence
  • Type Safety: Strong typing with validation

Recent Updates

Latest

  • Value Evaluation Rules: New = prefix convention for distinguishing literal values from expressions in step inputs, outputs, and values. Strings without = are treated as literals; strings with = prefix are evaluated as expressions; == prefix escapes a literal =.
  • LiteLLM BYOK Support: New RequestAPIKey config option sends an api_key field in the JSON request body for Bring Your Own Key passthrough to downstream providers.
  • Doc Adapter: Inject semantic documents into LLM system prompts via docs parameter and registry declarations.

v2.3 Release

  • S3 File Adapter: AWS S3 storage support for workflow files and artifacts (compatible with MinIO and other S3-compatible services)
  • LiteLLM Adapter: Integration with LiteLLM proxy for unified LLM API access across multiple providers
  • Explicit Pause Steps: New Pause_* step type for explicit workflow pausing (in addition to service-triggered pauses)
  • Improved Completion Behavior: Steps without next field are treated as completed (not paused), allowing child branches to complete naturally
  • Bug Fixes:
    • Fixed error type checking in adapters
    • URL-encode S3 CopySource for special characters
    • Increased SSE scanner buffer to 1MB for large LLM responses
  • Documentation: Added known limitations section for adapter implementations

Architecture

Core Components

  1. Engine - Main workflow execution orchestrator
  2. Registry - Declares external dependencies and boundaries
  3. Expression Evaluator - Evaluates workflow expressions and variable paths
  4. Adapters - Interfaces for external services, components, LLMs, and file operations
  5. Steps - Various node types for different operations

Installation

go get -u workflow

Optional Dependencies

For S3 file storage support:

go get github.com/aws/aws-sdk-go-v2
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/s3

The LiteLLM adapter uses only standard library HTTP client (no additional dependencies required).

Quick Start

1. Define a Workflow

package main

import (
    "context"
    "fmt"
    "workflow"
)

func main() {
    // Define workflow structure
    wf := &workflow.Workflow{
        Version: "2.3",
        Name:    "Simple Task Workflow",
        Registry: workflow.Registry{
            Services: []string{
                "TaskService(input(STRING)) RETURN result(OBJECT)",
            },
            Components: []string{},
            Vars: []string{
                "$input(STRING)",
                "$result(OBJECT)",
            },
            Files: workflow.FilesRegistry{
                Inputs:    []string{},
                Artifacts: []string{"Process/Artifacts/*"},
            },
        },
        Steps: []workflow.Step{
            {
                ID: "Service_TaskService",
                In: workflow.StepInput{
                    "input": "=$input",
                },
                Out: workflow.StepOutput{
                    "$result": "=_result.result",
                },
                Next: "Stop_End",
            },
            {
                ID: "Stop_End",
            },
        },
    }

    // Setup and execute...
}

2. Create Adapters

// Create service adapter
serviceAdapter := workflow.NewDefaultServiceAdapter()
serviceAdapter.RegisterHandler("TaskService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
    input := params["input"].(string)
    return &workflow.ServiceResult{
        Data: map[string]interface{}{
            "result": map[string]interface{}{
                "status": "completed",
                "message": fmt.Sprintf("Processed: %s", input),
            },
        },
    }, nil
})

// Create adapters bundle (using default adapters)
adapters := &workflow.Adapters{
    Service:   serviceAdapter,
    Component: workflow.NewDefaultComponentAdapter(),
    LLM:       workflow.NewDefaultLLMAdapter(),
    File:      workflow.NewDefaultFileAdapter(),
}

// Or use specialized adapters (S3 for files, LiteLLM for LLM calls)
// See "Built-in Adapters" section below for S3FileAdapter and LiteLLMAdapter

3. Execute Workflow

// Create engine
engine, err := workflow.NewEngine(wf)
if err != nil {
    panic(err)
}

// Execute with initial variables
initialVars := map[string]interface{}{
    "$input": "Hello, Workflow!",
}

result, err := engine.Execute(context.Background(), initialVars, adapters)
if err != nil {
    panic(err)
}

// Process events
for event := range result.EventStream {
    fmt.Printf("Event: %s - %s\n", event.Type, event.StepID)
}

Step Types

Service Steps (Service_*)

Call project services with input/output mapping.

{
    ID: "Service_MyService",
    In: workflow.StepInput{
        "param1": "=$variable1",      // expression: evaluates $variable1
        "param2": "literal value",     // literal string (no = prefix)
        "param3": true,                // literal bool
    },
    Out: workflow.StepOutput{
        "$result": "=_result.data",
    },
    Next: "NextStep",
}

Component Steps (Component_*)

Call system components (MCP, file operations, etc.).

{
    ID: "Component_FileOps",
    In: workflow.StepInput{
        "operation": "read",
        "path":      "=$filePath",
    },
    Out: workflow.StepOutput{
        "$fileContent": "=_result.content",
    },
    Next: "NextStep",
}

LLM Steps (LLM_*)

Call large language models with streaming support.

{
    ID: "LLM_Generate",
    In: workflow.StepInput{
        "model":  "gpt-4",   // literal string
        "stream": true,       // literal bool
        "messages": []interface{}{
            map[string]interface{}{
                "role":    "user",
                "content": "=$prompt",
            },
        },
    },
    Out: workflow.StepOutput{
        "$generatedText": "=_result.content",
    },
    Next: "NextStep",
}

Set Steps (Set_*)

Update global variables.

{
    ID:     "Set_UpdateStatus",
    Target: "$status",
    Value:  "completed",       // literal string (no = prefix needed)
    Next:   "NextStep",
}

// To set a variable from an expression:
{
    ID:     "Set_Total",
    Target: "$total",
    Value:  "=$price * $quantity",  // expression (= prefix)
    Next:   "NextStep",
}

Write Steps (Write_*)

Write files to artifacts directory.

{
    ID:     "Write_Output",
    Target: "Process/Artifacts/output.txt", // literal path (no = prefix)
    Value:  "=$content",                    // expression (= prefix)
    Mode:   "overwrite",                    // or "append", "failIfExists"
    Next:   "NextStep",
}

Branch Steps (Branch_*)

Conditional branching based on expressions.

{
    ID: "Branch_CheckAmount",
    Cases: [][]string{
        {"$amount < 1000", "Service_AutoApprove"},
        {"$amount < 10000", "Service_ManagerApprove"},
        {"ELSE", "Service_DirectorApprove"},
    },
    Next: "Stop_End",
}

Loop Steps (Loop_*)

Iterate over collections with serial or parallel execution.

{
    ID:       "Loop_ProcessItems",
    Mode:     "parallel", // or "serial"
    Source:   "$items",
    Children: []string{"Service_ProcessOne"},
    Next:     "Stop_End",
}

Stop Steps (Stop_*)

Explicitly terminate workflow execution.

{
    ID: "Stop_End",
}

Pause Steps (Pause_*)

Explicitly pause workflow execution, waiting for external resume.

{
    ID: "Pause_WaitApproval",
}

Note: Workflows only pause when:

  1. Reaching a Pause_* step
  2. A Service returns StatusPatch.Control.Pause = true

When a step has no next, the branch is considered completed (not paused). Child branches ending without a next are considered complete, allowing the parent to continue.

Features

Value Evaluation Rules

Step input values (In, Out, Value, Target for Write steps) follow these rules:

Value Treatment Result
Non-string JSON types (bool, number, null, object, array) Literal Returned as-is
String without = prefix Literal string "hello""hello"
String with = prefix Expression "=$name" → evaluates $name
String with == prefix Escaped literal "==foo""=foo"
// Literal values (no = prefix)
"hello world"          // literal string "hello world"
"Process/output.txt"   // literal string path

// Expression values (= prefix)
"=$variable"           // evaluates variable reference
"=$object.field"       // evaluates nested path
"=$array[0]"           // evaluates array index
"=$age + 5"            // evaluates arithmetic
"=$status == \"done\"" // evaluates comparison

// Escaped literal (== prefix, strips one =)
"==something"          // literal string "=something"

// Non-string types are always literal
true                   // literal bool
42                     // literal number
map[string]interface{}{...} // literal object

Note: If conditions, Source expressions, and Cases expressions are always evaluated as expressions directly (no = prefix needed).

Expression Language

The expression language (used after the = prefix, or directly in If/Source/Cases) supports:

// Variable references
"$variable"
"$object.field"
"$array[0]"
"$array[_index]"

// Comparisons
"$amount > 1000"
"$status == \"approved\""
"$count >= 10"

// Logical operators
"$approved == true && $amount < 5000"
"$error != null || $retry > 3"

// Arithmetic
"$total + 100"
"$price * $quantity"
"\"Result: \" + $value"

// Literals (within expressions)
"\"string literal\""
"42"
"true"
"false"

Variables

Three types of variables:

  1. Global Variables ($xxx): Declared in registry, accessible throughout workflow
  2. System Variables (SYSVAR.xxx): Read-only configuration from system
  3. Local Variables (_item, _index, _result): Loop and step-scoped variables

Conditional Execution

Use the if property to conditionally execute steps:

{
    ID: "Service_Notify",
    If: "$shouldNotify == true",       // If is always an expression (no = prefix)
    In: workflow.StepInput{
        "message": "=$notificationText", // In values use = prefix for expressions
    },
    Next: "Stop_End",
}

Pause and Resume

Workflows can pause for external input (e.g., human approval). Pausing requires explicit action:

Option 1: Use a Pause_* step

{
    ID: "Pause_WaitApproval",
}

Option 2: Service can trigger pause via StatusPatch

return &workflow.ServiceResult{
    Data: map[string]interface{}{
        "status": "pending",
    },
    StatusPatch: &workflow.StatusPatch{
        Vars: map[string]interface{}{
            "$approvalStatus": "pending",
        },
        Control: workflow.ControlDirective{
            Pause:  true,
            Reason: "WAIT_APPROVAL",
        },
    },
}

Resume later with updated state:

resumeInput := &workflow.ResumeInput{
    State: pauseState, // From paused event
    Patch: map[string]interface{}{
        "$approved": true,
        "$approvalStatus": "approved",
    },
}

result, err := engine.Resume(context.Background(), resumeInput, adapters)

Important: Steps without a next field are considered completed, not paused. This allows child branches to complete naturally without pausing the entire workflow.

Event Stream

Real-time workflow events:

for event := range result.EventStream {
    switch event.Type {
    case workflow.EventStepStart:
        fmt.Printf("Step started: %s\n", event.StepID)
    case workflow.EventStepComplete:
        fmt.Printf("Step completed: %s\n", event.StepID)
    case workflow.EventVariableUpdate:
        fmt.Printf("Variable updated: %v\n", event.Data)
    case workflow.EventPaused:
        state := event.Data["state"].(workflow.PauseState)
        // Save state for later resume
    case workflow.EventLLMStream:
        chunk := event.Data["chunk"].(string)
        fmt.Print(chunk)
    }
}

Registry

The registry declares all external dependencies:

Registry: workflow.Registry{
    // Service signatures (VL format)
    Services: []string{
        "ServiceName(param1(TYPE1), param2(TYPE2)) RETURN result1(TYPE1), result2(TYPE2)",
    },

    // Component IDs
    Components: []string{"FileOps", "MCP_Search"},

    // Global variables (VL format)
    Vars: []string{
        "$varName(TYPE)",
        "$items([OBJECT])",
    },

    // File access boundaries
    Files: workflow.FilesRegistry{
        Inputs:    []string{"Process/Input/*"},    // Read-only
        Artifacts: []string{"Process/Artifacts/*"}, // Writable
    },
}

Built-in Adapters

S3 File Adapter

For AWS S3 storage (or S3-compatible services like MinIO):

import (
    "context"
    "workflow"
)

// Create S3 file adapter
s3Adapter, err := workflow.NewS3FileAdapter(ctx, workflow.S3Config{
    Bucket:   "my-workflow-bucket",
    Prefix:   "workflows/", // Optional key prefix
    Region:   "us-east-1",
    // Optional: Custom endpoint for MinIO/LocalStack
    // Endpoint:     "http://localhost:9000",
    // UsePathStyle: true,
    // Optional: Explicit credentials (otherwise uses AWS credential chain)
    // AccessKeyID:     "access-key",
    // SecretAccessKey: "secret-key",
})
if err != nil {
    panic(err)
}

adapters := &workflow.Adapters{
    Service:   serviceAdapter,
    Component: componentAdapter,
    LLM:       llmAdapter,
    File:      s3Adapter,
}

LiteLLM Adapter

For LLM calls through a LiteLLM proxy server:

// Create LiteLLM adapter
liteLLMAdapter := workflow.NewLiteLLMAdapter(workflow.LiteLLMConfig{
    BaseURL:       "http://localhost:4000",
    APIKey:        "your-api-key",      // Optional: sent in Authorization header
    RequestAPIKey: "user-provider-key", // Optional: sent as "api_key" in request body (BYOK)
    Timeout:       5 * time.Minute,     // Optional (default: 5 minutes)
})

adapters := &workflow.Adapters{
    Service:   serviceAdapter,
    Component: componentAdapter,
    LLM:       liteLLMAdapter,
    File:      fileAdapter,
}

Configuration options:

Field Description
BaseURL LiteLLM server URL (default: http://localhost:4000)
APIKey Sent in Authorization: Bearer <key> header to authenticate with the LiteLLM proxy
RequestAPIKey Sent as api_key in the JSON request body for BYOK (Bring Your Own Key) passthrough to downstream providers
Model Optional model override (overwrites model in all LLM requests)
CacheControl Optional cache_control override for Anthropic models
Timeout HTTP client timeout (default: 5 minutes)

Custom Adapters

Implement custom adapters for your infrastructure:

type MyServiceAdapter struct {
    client *http.Client
}

func (a *MyServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]interface{}) (*workflow.ServiceResult, error) {
    // Your custom service call logic
    resp, err := a.client.Post(
        "https://api.example.com/"+serviceName,
        "application/json",
        paramsToJSON(params),
    )
    // ... handle response
    return &workflow.ServiceResult{
        Data: resultData,
    }, nil
}

Testing

Run the examples:

go test -v ./...

Example tests demonstrate:

  • Simple workflows
  • Loop execution (serial and parallel)
  • Pause/resume patterns
  • JSON workflow loading
  • Branch conditions

Best Practices

  1. Declare before use: All services, components, and variables must be declared in the registry
  2. Use descriptive IDs: Prefix step IDs with type (Service, LLM, etc.)
  3. Validate early: Call workflow.Validate() before execution
  4. Handle events: Process the event stream to track progress
  5. Externalize state: Save pause states for crash recovery
  6. Test adapters: Implement proper error handling in custom adapters

Advanced Topics

Parallel Execution

Children are executed in parallel:

{
    ID: "Service_Parent",
    Children: []string{"Service_A", "Service_B", "Service_C"},
    Next: "Service_AfterJoin", // Waits for all children to complete
}

Loop Variables

Access loop context with _item and _index:

{
    ID:     "Set_ProcessItem",
    Target: "$results[_index]",
    Value:  "=_item.value",
}

Deep Path Access

Navigate nested structures:

"$user.profile.email"
"$items[0].name"
"$data[_index].field"

Status Patch Protocol

Services can update workflow state:

StatusPatch: &workflow.StatusPatch{
    Vars: map[string]interface{}{
        "$approval.status": "approved",
        "$approval.timestamp": time.Now(),
    },
    Control: workflow.ControlDirective{
        Pause: false,
    },
}

API Reference

Core Types

  • Workflow - Top-level workflow definition
  • Registry - External dependencies declaration
  • Step - Individual workflow step
  • ExecutionContext - Runtime state
  • Engine - Workflow executor

Execution Status

  • StatusRunning - Workflow is currently executing
  • StatusCompleted - Workflow completed (reached end naturally)
  • StatusPaused - Workflow paused (via Pause_* step or StatusPatch)
  • StatusStopped - Workflow stopped (via Stop_* step)
  • StatusFailed - Workflow failed due to error

Adapters

  • ServiceAdapter - Service call interface
  • ComponentAdapter - Component call interface
  • LLMAdapter - LLM call interface
  • FileAdapter - File operation interface

Events

  • EventStepStart - Step execution started
  • EventStepComplete - Step execution completed
  • EventStepError - Step execution failed
  • EventVariableUpdate - Variable value changed
  • EventFileWrite - File written to artifacts
  • EventPaused - Workflow paused (via Pause_* step or StatusPatch)
  • EventResumed - Workflow resumed
  • EventCompleted - Workflow completed (reached end or Stop_* step)
  • EventFailed - Workflow failed due to error
  • EventLLMStream - LLM streaming chunk

Known Limitations

S3 File Adapter

  • Non-atomic WriteModeFailIfExists: The check-then-write pattern for WriteModeFailIfExists is not atomic. Another process could create the file between the Exists check and the PutObject call. S3 doesn't support conditional writes natively. Workarounds would require S3 Object Lock or versioning, which changes API semantics.

  • No retry logic: The adapter doesn't implement retry with exponential backoff for transient S3 failures (network issues, throttling). Consider wrapping calls with your own retry logic for production use.

LiteLLM Adapter

  • No retry logic: The adapter doesn't implement retry for transient HTTP failures. For production use, consider implementing retry with exponential backoff.

  • Single HTTP client timeout: The HTTP client timeout applies to the entire request. For long-running streaming responses, individual chunk reads don't have separate timeouts.

License

This library is provided as-is for workflow orchestration purposes.

Contributing

The library follows the v2.3 Workflow specification. For spec updates or clarifications, refer to the original specification document.

Support

For issues, questions, or contributions, please refer to the project repository.