// workflow — local runner for the Workflow Engine // // Usage: // // workflow server [--port 8080] [--config .env] // workflow run -workflow FILE -workspace-id ID [-params JSON] [-mode MODE] [-v] // workflow help // // Config (.env in working directory, or --config flag): // // LLM_URL=http://localhost:4000 // LLM_KEY=sk-xxx // LLM_MODEL=gpt-4o // WORKSPACE_ROOT=./workspace // PORT=8080 package main import ( "context" "encoding/json" "flag" "fmt" "log" "net/http" "os" "strings" "time" "workflow" "workflow/config" ) func main() { if len(os.Args) < 2 { printHelp() os.Exit(1) } switch os.Args[1] { case "server": cmdServer(os.Args[2:]) case "run": cmdRun(os.Args[2:]) case "help", "--help", "-h": printHelp() default: fmt.Fprintf(os.Stderr, "unknown command: %q\n\n", os.Args[1]) printHelp() os.Exit(1) } } func printHelp() { fmt.Print(`workflow — Workflow Engine local runner Commands: server Start local HTTP server (POST /run → SSE event stream) run Execute a workflow file directly in the terminal help Show this help Server usage: workflow server [--port 8080] [--config .env] Run usage: workflow run -workflow FILE -workspace-id ID [options] -workflow Workflow JSON file (required) -workspace-id Project identifier / gid (required) -params Input params as JSON: '{"userRequest":"..."}' -mode create | patch | regenerate | validate (default: create) -config Config file path (default: .env in CWD) -v Verbose: print all events Config file (.env or --config path): LLM_URL=http://localhost:4000 LLM_KEY=sk-xxx LLM_MODEL=gpt-4o WORKSPACE_ROOT=./workspace PORT=8080 `) } // ───────────────────────────────────────────────────────────────────────────── // server subcommand // ───────────────────────────────────────────────────────────────────────────── func cmdServer(args []string) { fs := flag.NewFlagSet("server", flag.ExitOnError) cfgFile := fs.String("config", "", "Config file (default: .env in CWD)") fs.Usage = func() { fmt.Fprintln(os.Stderr, "Usage: workflow server [--config FILE]") fs.PrintDefaults() } fs.Parse(args) cfg := config.Load(*cfgFile) log.SetFlags(log.Ltime | log.Lmsgprefix) log.SetPrefix("workflow ") log.Printf("endpoint : POST http://localhost:%s/run", cfg.Server.Port) log.Printf("workspace : %s//", cfg.Workspace.Root) log.Printf("llm url : %s", cfg.LLM.URL) log.Printf("llm model : %s", cfg.LLM.Model) log.Printf("llm key : %s", config.MaskKey(cfg.LLM.Key)) mux := http.NewServeMux() mux.HandleFunc("/run", serverRunHandler(cfg)) mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"status":"ok","ts":%q}`+"\n", nowTS()) }) srv := &http.Server{ Addr: ":" + cfg.Server.Port, Handler: mux, ReadTimeout: 30 * time.Second, IdleTimeout: 120 * time.Second, // WriteTimeout intentionally 0 — SSE streams can be long } log.Printf("Listening on :%s", cfg.Server.Port) if err := srv.ListenAndServe(); err != nil { log.Fatal(err) } } // RunRequest is the JSON body for POST /run. type RunRequest struct { WorkflowID string `json:"workflowId"` WorkflowDef map[string]interface{} `json:"workflowDef"` WorkflowFile string `json:"workflowFile"` RunParams workflow.RunParams `json:"runParams"` } // sseEvent mirrors the spec §13.2 event shape. type sseEvent struct { RunID string `json:"run_id"` Seq int `json:"seq"` TS string `json:"ts"` Type string `json:"type"` StepID *string `json:"step_id"` Payload map[string]interface{} `json:"payload"` } func writeSSE(w http.ResponseWriter, f http.Flusher, ev sseEvent) { data, _ := json.Marshal(ev) fmt.Fprintf(w, "data: %s\n\n", data) f.Flush() } func serverRunHandler(cfg config.Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "POST /run only", http.StatusMethodNotAllowed) return } var req RunRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest) return } wf, err := loadWorkflow(req) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } eng, err := workflow.NewEngine(wf) if err != nil { http.Error(w, "engine: "+err.Error(), http.StatusBadRequest) return } adapters, err := buildAdapters(cfg, req.RunParams.WorkspaceID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return } runID := fmt.Sprintf("run_%d", time.Now().UnixMilli()) label := req.WorkflowID if label == "" { label = wf.Name } log.Printf("[%s] start workflow=%s gid=%s mode=%s", runID, label, req.RunParams.WorkspaceID, req.RunParams.Mode) result, err := eng.ExecuteWithRunParams(r.Context(), nil, adapters, req.RunParams) if err != nil { writeSSE(w, flusher, sseEvent{ RunID: runID, Seq: 1, TS: nowTS(), Type: "workflow_failed", Payload: map[string]interface{}{"error": err.Error()}, }) return } seq := 1 for ev := range result.RunEventStream { payload := ev.Payload if payload == nil { payload = map[string]interface{}{} } writeSSE(w, flusher, sseEvent{ RunID: runID, Seq: seq, TS: nowTS(), Type: string(ev.Type), StepID: ev.StepID, Payload: payload, }) seq++ } log.Printf("[%s] done status=%s events=%d", runID, result.Context.Status, seq-1) } } // ───────────────────────────────────────────────────────────────────────────── // run subcommand // ───────────────────────────────────────────────────────────────────────────── func cmdRun(args []string) { fs := flag.NewFlagSet("run", flag.ExitOnError) workflowFile := fs.String("workflow", "", "Workflow JSON file (required)") workspaceID := fs.String("workspace-id", envOr("WORKSPACE_ID", ""), "Project identifier / gid (required)") workspaceRoot := fs.String("workspace-root", "", "Override WORKSPACE_ROOT from config") paramsJSON := fs.String("params", "", `Input params JSON: '{"key":"value"}'`) mode := fs.String("mode", "create", "create|patch|regenerate|validate") cfgFile := fs.String("config", "", "Config file (default: .env in CWD)") verbose := fs.Bool("v", false, "Verbose: print all events") fs.Usage = func() { fmt.Fprintln(os.Stderr, "Usage: workflow run -workflow FILE -workspace-id ID [options]") fs.PrintDefaults() } fs.Parse(args) if *workflowFile == "" || *workspaceID == "" { fs.Usage() os.Exit(1) } cfg := config.Load(*cfgFile) if *workspaceRoot != "" { cfg.Workspace.Root = *workspaceRoot } wfBytes, err := os.ReadFile(*workflowFile) if err != nil { fatalf("read workflow: %v", err) } var wf workflow.Workflow if err := json.Unmarshal(wfBytes, &wf); err != nil { fatalf("parse workflow: %v", err) } var params map[string]interface{} if strings.TrimSpace(*paramsJSON) != "" { if err := json.Unmarshal([]byte(*paramsJSON), ¶ms); err != nil { fatalf("parse -params: %v", err) } } fmt.Printf("▶ workflow : %s (v%s)\n", wf.Name, wf.Version) fmt.Printf(" workspace-id: %s\n", *workspaceID) fmt.Printf(" workspace : %s/%s\n", cfg.Workspace.Root, *workspaceID) fmt.Printf(" mode : %s\n", *mode) if len(params) > 0 { raw, _ := json.Marshal(params) fmt.Printf(" params : %s\n", raw) } fmt.Printf(" llm : %s model=%s\n\n", cfg.LLM.URL, cfg.LLM.Model) adapters, err := buildAdapters(cfg, *workspaceID) if err != nil { fatalf("adapters: %v", err) } eng, err := workflow.NewEngine(&wf) if err != nil { fatalf("engine: %v", err) } start := time.Now() result, err := eng.ExecuteWithRunParams(context.Background(), nil, adapters, workflow.RunParams{Params: params, WorkspaceID: *workspaceID, Mode: *mode}) if err != nil { fatalf("execute: %v", err) } var steps, llms int for ev := range result.RunEventStream { switch ev.Type { case workflow.RunEventStepStart: if ev.StepID != nil { fmt.Printf(" → %-30s\n", *ev.StepID) } case workflow.RunEventStepDone: steps++ case workflow.RunEventStepSkipped: if ev.StepID != nil { fmt.Printf(" ↷ %-30s (skipped)\n", *ev.StepID) } case workflow.RunEventStepError: id := "" if ev.StepID != nil { id = *ev.StepID } fmt.Printf(" ✗ %-30s ERROR: %v\n", id, ev.Payload["error"]) case workflow.RunEventLLMDone: llms++ id := "" if ev.StepID != nil { id = *ev.StepID } fmt.Printf(" llm_done %s %vms model=%v\n", id, ev.Payload["latency_ms"], ev.Payload["model"]) if u, ok := ev.Payload["usage"].(map[string]interface{}); ok { fmt.Printf(" tokens in=%v out=%v total=%v\n", u["input_tokens"], u["output_tokens"], u["total_tokens"]) } default: if *verbose { raw, _ := json.Marshal(ev.Payload) id := "" if ev.StepID != nil { id = *ev.StepID } fmt.Printf(" ~ %-20s %-25s %s\n", ev.Type, id, raw) } } } fmt.Printf("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n") status := result.Context.Status icon := "✓" if status == workflow.StatusFailed { icon = "✗" } fmt.Printf("%s %s steps=%d llm=%d %v\n", icon, status, steps, llms, time.Since(start).Round(time.Millisecond)) if len(result.Context.Variables) > 0 { fmt.Println("\n── variables ──") for k, v := range result.Context.Variables { if s, ok := v.(string); ok { if len(s) > 120 { fmt.Printf(" %-20s = %q…\n", k, s[:120]) } else { fmt.Printf(" %-20s = %q\n", k, s) } } else { raw, _ := json.MarshalIndent(v, " ", " ") fmt.Printf(" %-20s = %s\n", k, raw) } } } if len(result.Context.Artifacts) > 0 { fmt.Println("\n── artifacts ──") for p := range result.Context.Artifacts { fmt.Printf(" %s/%s%s\n", cfg.Workspace.Root, *workspaceID, p) } } fmt.Println() if status == workflow.StatusFailed { os.Exit(1) } } // ───────────────────────────────────────────────────────────────────────────── // shared helpers // ───────────────────────────────────────────────────────────────────────────── func loadWorkflow(req RunRequest) (*workflow.Workflow, error) { var wf workflow.Workflow switch { case req.WorkflowFile != "": data, err := os.ReadFile(req.WorkflowFile) if err != nil { return nil, fmt.Errorf("read workflowFile: %w", err) } if err := json.Unmarshal(data, &wf); err != nil { return nil, fmt.Errorf("parse workflowFile: %w", err) } case req.WorkflowDef != nil: raw, _ := json.Marshal(req.WorkflowDef) if err := json.Unmarshal(raw, &wf); err != nil { return nil, fmt.Errorf("parse workflowDef: %w", err) } default: return nil, fmt.Errorf("provide workflowDef or workflowFile") } return &wf, nil } func buildAdapters(cfg config.Config, workspaceID string) (*workflow.Adapters, error) { gid := workspaceID if gid == "" { gid = "default" } // v3.16+: Build multi-provider LLM adapter registry registry := buildLLMRegistry(cfg) file, err := workflow.NewLocalFileAdapter(cfg.Workspace.Root, gid) if err != nil { return nil, fmt.Errorf("file adapter: %w", err) } return &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: registry, // LLMAdapterRegistry implements LLMAdapter LLMAdapterRegistry: registry, File: file, }, nil } // buildLLMRegistry creates a multi-provider LLM adapter registry (v3.16+). // It reads cfg.LLM (global default) and cfg.Providers (per-provider) to build // adapters for each known provider (openai, anthropic), then determines // the default adapter from LLM_MODEL or LLM_URL. func buildLLMRegistry(cfg config.Config) *workflow.LLMAdapterRegistry { // Determine default provider from LLM_MODEL (provider/modelId format) or LLM_URL defaultProvider := "openai" defaultModelId := cfg.LLM.Model if strings.Contains(cfg.LLM.Model, "/") { parts := strings.SplitN(cfg.LLM.Model, "/", 2) defaultProvider = parts[0] defaultModelId = parts[1] } else if strings.Contains(cfg.LLM.URL, "anthropic.com") { defaultProvider = "anthropic" } // Build OpenAI adapter openaiKey := cfg.LLM.Key // fallback openaiModel := defaultModelId openaiURL := cfg.LLM.URL if p, ok := cfg.Providers["openai"]; ok { if p.APIKey != "" { openaiKey = p.APIKey } if p.Model != "" { openaiModel = p.Model } if p.BaseURL != "" { openaiURL = p.BaseURL } } // Only use openaiModel as default if openai IS the default provider if defaultProvider != "openai" { if p, ok := cfg.Providers["openai"]; ok && p.Model != "" { openaiModel = p.Model } else { openaiModel = "" // no default model for non-default provider } } openaiAdapter := workflow.NewOpenAIAdapter(workflow.OpenAIConfig{ BaseURL: openaiURL, APIKey: openaiKey, Model: openaiModel, }) // Build Anthropic adapter anthropicKey := cfg.LLM.Key // fallback anthropicModel := "" anthropicURL := "https://api.anthropic.com" if p, ok := cfg.Providers["anthropic"]; ok { if p.APIKey != "" { anthropicKey = p.APIKey } if p.Model != "" { anthropicModel = p.Model } if p.BaseURL != "" { anthropicURL = p.BaseURL } } // Use defaultModelId if anthropic is the default provider and no provider-specific model if defaultProvider == "anthropic" && anthropicModel == "" { anthropicModel = defaultModelId } anthropicAdapter := workflow.NewAnthropicAdapter(workflow.AnthropicConfig{ APIKey: anthropicKey, Model: anthropicModel, BaseURL: anthropicURL, }) // Select default adapter based on determined provider var defaultAdapter workflow.LLMAdapter switch defaultProvider { case "anthropic": defaultAdapter = anthropicAdapter default: defaultAdapter = openaiAdapter } registry := workflow.NewLLMAdapterRegistry(defaultAdapter, defaultProvider) registry.Register("openai", openaiAdapter) registry.Register("anthropic", anthropicAdapter) return registry } func nowTS() string { return time.Now().UTC().Format("2006-01-02T15:04:05.000Z") } func envOr(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } func fatalf(format string, args ...interface{}) { fmt.Fprintf(os.Stderr, "error: "+format+"\n", args...) os.Exit(1) }