A comprehensive Golang implementation of the v2.3 Workflow specification for orchestrating multi-step processes with support for pause/resume, parallel execution, and flexible control flow.
This library implements a stateless workflow orchestration engine that supports:
= prefix convention for distinguishing literal values from expressions in step inputs, outputs, and values. Strings without = are treated as literals; strings with = prefix are evaluated as expressions; == prefix escapes a literal =.RequestAPIKey config option sends an api_key field in the JSON request body for Bring Your Own Key passthrough to downstream providers.docs parameter and registry declarations.Pause_* step type for explicit workflow pausing (in addition to service-triggered pauses)next field are treated as completed (not paused), allowing child branches to complete naturallygo get -u workflow
For S3 file storage support:
go get github.com/aws/aws-sdk-go-v2
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/s3
The LiteLLM adapter uses only standard library HTTP client (no additional dependencies required).
package main
import (
"context"
"fmt"
"workflow"
)
func main() {
// Define workflow structure
wf := &workflow.Workflow{
Version: "2.3",
Name: "Simple Task Workflow",
Registry: workflow.Registry{
Services: []string{
"TaskService(input(STRING)) RETURN result(OBJECT)",
},
Components: []string{},
Vars: []string{
"$input(STRING)",
"$result(OBJECT)",
},
Files: workflow.FilesRegistry{
Inputs: []string{},
Artifacts: []string{"Process/Artifacts/*"},
},
},
Steps: []workflow.Step{
{
ID: "Service_TaskService",
In: workflow.StepInput{
"input": "=$input",
},
Out: workflow.StepOutput{
"$result": "=_result.result",
},
Next: "Stop_End",
},
{
ID: "Stop_End",
},
},
}
// Setup and execute...
}
// Create service adapter
serviceAdapter := workflow.NewDefaultServiceAdapter()
serviceAdapter.RegisterHandler("TaskService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
input := params["input"].(string)
return &workflow.ServiceResult{
Data: map[string]interface{}{
"result": map[string]interface{}{
"status": "completed",
"message": fmt.Sprintf("Processed: %s", input),
},
},
}, nil
})
// Create adapters bundle (using default adapters)
adapters := &workflow.Adapters{
Service: serviceAdapter,
Component: workflow.NewDefaultComponentAdapter(),
LLM: workflow.NewDefaultLLMAdapter(),
File: workflow.NewDefaultFileAdapter(),
}
// Or use specialized adapters (S3 for files, LiteLLM for LLM calls)
// See "Built-in Adapters" section below for S3FileAdapter and LiteLLMAdapter
// Create engine
engine, err := workflow.NewEngine(wf)
if err != nil {
panic(err)
}
// Execute with initial variables
initialVars := map[string]interface{}{
"$input": "Hello, Workflow!",
}
result, err := engine.Execute(context.Background(), initialVars, adapters)
if err != nil {
panic(err)
}
// Process events
for event := range result.EventStream {
fmt.Printf("Event: %s - %s\n", event.Type, event.StepID)
}
Call project services with input/output mapping.
{
ID: "Service_MyService",
In: workflow.StepInput{
"param1": "=$variable1", // expression: evaluates $variable1
"param2": "literal value", // literal string (no = prefix)
"param3": true, // literal bool
},
Out: workflow.StepOutput{
"$result": "=_result.data",
},
Next: "NextStep",
}
Call system components (MCP, file operations, etc.).
{
ID: "Component_FileOps",
In: workflow.StepInput{
"operation": "read",
"path": "=$filePath",
},
Out: workflow.StepOutput{
"$fileContent": "=_result.content",
},
Next: "NextStep",
}
Call large language models with streaming support.
{
ID: "LLM_Generate",
In: workflow.StepInput{
"model": "gpt-4", // literal string
"stream": true, // literal bool
"messages": []interface{}{
map[string]interface{}{
"role": "user",
"content": "=$prompt",
},
},
},
Out: workflow.StepOutput{
"$generatedText": "=_result.content",
},
Next: "NextStep",
}
Update global variables.
{
ID: "Set_UpdateStatus",
Target: "$status",
Value: "completed", // literal string (no = prefix needed)
Next: "NextStep",
}
// To set a variable from an expression:
{
ID: "Set_Total",
Target: "$total",
Value: "=$price * $quantity", // expression (= prefix)
Next: "NextStep",
}
Write files to artifacts directory.
{
ID: "Write_Output",
Target: "Process/Artifacts/output.txt", // literal path (no = prefix)
Value: "=$content", // expression (= prefix)
Mode: "overwrite", // or "append", "failIfExists"
Next: "NextStep",
}
Conditional branching based on expressions.
{
ID: "Branch_CheckAmount",
Cases: [][]string{
{"$amount < 1000", "Service_AutoApprove"},
{"$amount < 10000", "Service_ManagerApprove"},
{"ELSE", "Service_DirectorApprove"},
},
Next: "Stop_End",
}
Iterate over collections with serial or parallel execution.
{
ID: "Loop_ProcessItems",
Mode: "parallel", // or "serial"
Source: "$items",
Children: []string{"Service_ProcessOne"},
Next: "Stop_End",
}
Explicitly terminate workflow execution.
{
ID: "Stop_End",
}
Explicitly pause workflow execution, waiting for external resume.
{
ID: "Pause_WaitApproval",
}
Note: Workflows only pause when:
Pause_* stepStatusPatch.Control.Pause = trueWhen a step has no next, the branch is considered completed (not paused). Child branches ending without a next are considered complete, allowing the parent to continue.
Step input values (In, Out, Value, Target for Write steps) follow these rules:
| Value | Treatment | Result |
|---|---|---|
| Non-string JSON types (bool, number, null, object, array) | Literal | Returned as-is |
String without = prefix |
Literal string | "hello" → "hello" |
String with = prefix |
Expression | "=$name" → evaluates $name |
String with == prefix |
Escaped literal | "==foo" → "=foo" |
// Literal values (no = prefix)
"hello world" // literal string "hello world"
"Process/output.txt" // literal string path
// Expression values (= prefix)
"=$variable" // evaluates variable reference
"=$object.field" // evaluates nested path
"=$array[0]" // evaluates array index
"=$age + 5" // evaluates arithmetic
"=$status == \"done\"" // evaluates comparison
// Escaped literal (== prefix, strips one =)
"==something" // literal string "=something"
// Non-string types are always literal
true // literal bool
42 // literal number
map[string]interface{}{...} // literal object
Note: If conditions, Source expressions, and Cases expressions are always evaluated as expressions directly (no = prefix needed).
The expression language (used after the = prefix, or directly in If/Source/Cases) supports:
// Variable references
"$variable"
"$object.field"
"$array[0]"
"$array[_index]"
// Comparisons
"$amount > 1000"
"$status == \"approved\""
"$count >= 10"
// Logical operators
"$approved == true && $amount < 5000"
"$error != null || $retry > 3"
// Arithmetic
"$total + 100"
"$price * $quantity"
"\"Result: \" + $value"
// Literals (within expressions)
"\"string literal\""
"42"
"true"
"false"
Three types of variables:
$xxx): Declared in registry, accessible throughout workflowSYSVAR.xxx): Read-only configuration from system_item, _index, _result): Loop and step-scoped variablesUse the if property to conditionally execute steps:
{
ID: "Service_Notify",
If: "$shouldNotify == true", // If is always an expression (no = prefix)
In: workflow.StepInput{
"message": "=$notificationText", // In values use = prefix for expressions
},
Next: "Stop_End",
}
Workflows can pause for external input (e.g., human approval). Pausing requires explicit action:
Option 1: Use a Pause_* step
{
ID: "Pause_WaitApproval",
}
Option 2: Service can trigger pause via StatusPatch
return &workflow.ServiceResult{
Data: map[string]interface{}{
"status": "pending",
},
StatusPatch: &workflow.StatusPatch{
Vars: map[string]interface{}{
"$approvalStatus": "pending",
},
Control: workflow.ControlDirective{
Pause: true,
Reason: "WAIT_APPROVAL",
},
},
}
Resume later with updated state:
resumeInput := &workflow.ResumeInput{
State: pauseState, // From paused event
Patch: map[string]interface{}{
"$approved": true,
"$approvalStatus": "approved",
},
}
result, err := engine.Resume(context.Background(), resumeInput, adapters)
Important: Steps without a next field are considered completed, not paused. This allows child branches to complete naturally without pausing the entire workflow.
Real-time workflow events:
for event := range result.EventStream {
switch event.Type {
case workflow.EventStepStart:
fmt.Printf("Step started: %s\n", event.StepID)
case workflow.EventStepComplete:
fmt.Printf("Step completed: %s\n", event.StepID)
case workflow.EventVariableUpdate:
fmt.Printf("Variable updated: %v\n", event.Data)
case workflow.EventPaused:
state := event.Data["state"].(workflow.PauseState)
// Save state for later resume
case workflow.EventLLMStream:
chunk := event.Data["chunk"].(string)
fmt.Print(chunk)
}
}
The registry declares all external dependencies:
Registry: workflow.Registry{
// Service signatures (VL format)
Services: []string{
"ServiceName(param1(TYPE1), param2(TYPE2)) RETURN result1(TYPE1), result2(TYPE2)",
},
// Component IDs
Components: []string{"FileOps", "MCP_Search"},
// Global variables (VL format)
Vars: []string{
"$varName(TYPE)",
"$items([OBJECT])",
},
// File access boundaries
Files: workflow.FilesRegistry{
Inputs: []string{"Process/Input/*"}, // Read-only
Artifacts: []string{"Process/Artifacts/*"}, // Writable
},
}
For AWS S3 storage (or S3-compatible services like MinIO):
import (
"context"
"workflow"
)
// Create S3 file adapter
s3Adapter, err := workflow.NewS3FileAdapter(ctx, workflow.S3Config{
Bucket: "my-workflow-bucket",
Prefix: "workflows/", // Optional key prefix
Region: "us-east-1",
// Optional: Custom endpoint for MinIO/LocalStack
// Endpoint: "http://localhost:9000",
// UsePathStyle: true,
// Optional: Explicit credentials (otherwise uses AWS credential chain)
// AccessKeyID: "access-key",
// SecretAccessKey: "secret-key",
})
if err != nil {
panic(err)
}
adapters := &workflow.Adapters{
Service: serviceAdapter,
Component: componentAdapter,
LLM: llmAdapter,
File: s3Adapter,
}
For LLM calls through a LiteLLM proxy server:
// Create LiteLLM adapter
liteLLMAdapter := workflow.NewLiteLLMAdapter(workflow.LiteLLMConfig{
BaseURL: "http://localhost:4000",
APIKey: "your-api-key", // Optional: sent in Authorization header
RequestAPIKey: "user-provider-key", // Optional: sent as "api_key" in request body (BYOK)
Timeout: 5 * time.Minute, // Optional (default: 5 minutes)
})
adapters := &workflow.Adapters{
Service: serviceAdapter,
Component: componentAdapter,
LLM: liteLLMAdapter,
File: fileAdapter,
}
Configuration options:
| Field | Description |
|---|---|
BaseURL |
LiteLLM server URL (default: http://localhost:4000) |
APIKey |
Sent in Authorization: Bearer <key> header to authenticate with the LiteLLM proxy |
RequestAPIKey |
Sent as api_key in the JSON request body for BYOK (Bring Your Own Key) passthrough to downstream providers |
Model |
Optional model override (overwrites model in all LLM requests) |
CacheControl |
Optional cache_control override for Anthropic models |
Timeout |
HTTP client timeout (default: 5 minutes) |
Implement custom adapters for your infrastructure:
type MyServiceAdapter struct {
client *http.Client
}
func (a *MyServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]interface{}) (*workflow.ServiceResult, error) {
// Your custom service call logic
resp, err := a.client.Post(
"https://api.example.com/"+serviceName,
"application/json",
paramsToJSON(params),
)
// ... handle response
return &workflow.ServiceResult{
Data: resultData,
}, nil
}
Run the examples:
go test -v ./...
Example tests demonstrate:
workflow.Validate() before executionChildren are executed in parallel:
{
ID: "Service_Parent",
Children: []string{"Service_A", "Service_B", "Service_C"},
Next: "Service_AfterJoin", // Waits for all children to complete
}
Access loop context with _item and _index:
{
ID: "Set_ProcessItem",
Target: "$results[_index]",
Value: "=_item.value",
}
Navigate nested structures:
"$user.profile.email"
"$items[0].name"
"$data[_index].field"
Services can update workflow state:
StatusPatch: &workflow.StatusPatch{
Vars: map[string]interface{}{
"$approval.status": "approved",
"$approval.timestamp": time.Now(),
},
Control: workflow.ControlDirective{
Pause: false,
},
}
Workflow - Top-level workflow definitionRegistry - External dependencies declarationStep - Individual workflow stepExecutionContext - Runtime stateEngine - Workflow executorStatusRunning - Workflow is currently executingStatusCompleted - Workflow completed (reached end naturally)StatusPaused - Workflow paused (via Pause_* step or StatusPatch)StatusStopped - Workflow stopped (via Stop_* step)StatusFailed - Workflow failed due to errorServiceAdapter - Service call interfaceComponentAdapter - Component call interfaceLLMAdapter - LLM call interfaceFileAdapter - File operation interfaceEventStepStart - Step execution startedEventStepComplete - Step execution completedEventStepError - Step execution failedEventVariableUpdate - Variable value changedEventFileWrite - File written to artifactsEventPaused - Workflow paused (via Pause_* step or StatusPatch)EventResumed - Workflow resumedEventCompleted - Workflow completed (reached end or Stop_* step)EventFailed - Workflow failed due to errorEventLLMStream - LLM streaming chunkNon-atomic WriteModeFailIfExists: The check-then-write pattern for WriteModeFailIfExists is not atomic. Another process could create the file between the Exists check and the PutObject call. S3 doesn't support conditional writes natively. Workarounds would require S3 Object Lock or versioning, which changes API semantics.
No retry logic: The adapter doesn't implement retry with exponential backoff for transient S3 failures (network issues, throttling). Consider wrapping calls with your own retry logic for production use.
No retry logic: The adapter doesn't implement retry for transient HTTP failures. For production use, consider implementing retry with exponential backoff.
Single HTTP client timeout: The HTTP client timeout applies to the entire request. For long-running streaming responses, individual chunk reads don't have separate timeouts.
This library is provided as-is for workflow orchestration purposes.
The library follows the v2.3 Workflow specification. For spec updates or clarifications, refer to the original specification document.
For issues, questions, or contributions, please refer to the project repository.