| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- // workflow — local runner for the Workflow Engine
- //
- // Usage:
- //
- // workflow server [--port 8080] [--config .env]
- // workflow run -workflow FILE -workspace-id ID [-params JSON] [-mode MODE] [-v]
- // workflow help
- //
- // Config (.env in working directory, or --config flag):
- //
- // LLM_URL=http://localhost:4000
- // LLM_KEY=sk-xxx
- // LLM_MODEL=gpt-4o
- // WORKSPACE_ROOT=./workspace
- // PORT=8080
- package main
- import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "log"
- "net/http"
- "os"
- "strings"
- "time"
- "workflow"
- "workflow/config"
- )
- func main() {
- if len(os.Args) < 2 {
- printHelp()
- os.Exit(1)
- }
- switch os.Args[1] {
- case "server":
- cmdServer(os.Args[2:])
- case "run":
- cmdRun(os.Args[2:])
- case "help", "--help", "-h":
- printHelp()
- default:
- fmt.Fprintf(os.Stderr, "unknown command: %q\n\n", os.Args[1])
- printHelp()
- os.Exit(1)
- }
- }
- func printHelp() {
- fmt.Print(`workflow — Workflow Engine local runner
- Commands:
- server Start local HTTP server (POST /run → SSE event stream)
- run Execute a workflow file directly in the terminal
- help Show this help
- Server usage:
- workflow server [--port 8080] [--config .env]
- Run usage:
- workflow run -workflow FILE -workspace-id ID [options]
- -workflow Workflow JSON file (required)
- -workspace-id Project identifier / gid (required)
- -params Input params as JSON: '{"userRequest":"..."}'
- -mode create | patch | regenerate | validate (default: create)
- -config Config file path (default: .env in CWD)
- -v Verbose: print all events
- Config file (.env or --config path):
- LLM_URL=http://localhost:4000
- LLM_KEY=sk-xxx
- LLM_MODEL=gpt-4o
- WORKSPACE_ROOT=./workspace
- PORT=8080
- `)
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // server subcommand
- // ─────────────────────────────────────────────────────────────────────────────
- func cmdServer(args []string) {
- fs := flag.NewFlagSet("server", flag.ExitOnError)
- cfgFile := fs.String("config", "", "Config file (default: .env in CWD)")
- fs.Usage = func() {
- fmt.Fprintln(os.Stderr, "Usage: workflow server [--config FILE]")
- fs.PrintDefaults()
- }
- fs.Parse(args)
- cfg := config.Load(*cfgFile)
- log.SetFlags(log.Ltime | log.Lmsgprefix)
- log.SetPrefix("workflow ")
- log.Printf("endpoint : POST http://localhost:%s/run", cfg.Server.Port)
- log.Printf("workspace : %s/<workspaceId>/", cfg.Workspace.Root)
- log.Printf("llm url : %s", cfg.LLM.URL)
- log.Printf("llm model : %s", cfg.LLM.Model)
- log.Printf("llm key : %s", config.MaskKey(cfg.LLM.Key))
- mux := http.NewServeMux()
- mux.HandleFunc("/run", serverRunHandler(cfg))
- mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprintf(w, `{"status":"ok","ts":%q}`+"\n", nowTS())
- })
- srv := &http.Server{
- Addr: ":" + cfg.Server.Port,
- Handler: mux,
- ReadTimeout: 30 * time.Second,
- IdleTimeout: 120 * time.Second,
- // WriteTimeout intentionally 0 — SSE streams can be long
- }
- log.Printf("Listening on :%s", cfg.Server.Port)
- if err := srv.ListenAndServe(); err != nil {
- log.Fatal(err)
- }
- }
- // RunRequest is the JSON body for POST /run.
- type RunRequest struct {
- WorkflowID string `json:"workflowId"`
- WorkflowDef map[string]interface{} `json:"workflowDef"`
- WorkflowFile string `json:"workflowFile"`
- RunParams workflow.RunParams `json:"runParams"`
- }
- // sseEvent mirrors the spec §13.2 event shape.
- type sseEvent struct {
- RunID string `json:"run_id"`
- Seq int `json:"seq"`
- TS string `json:"ts"`
- Type string `json:"type"`
- StepID *string `json:"step_id"`
- Payload map[string]interface{} `json:"payload"`
- }
- func writeSSE(w http.ResponseWriter, f http.Flusher, ev sseEvent) {
- data, _ := json.Marshal(ev)
- fmt.Fprintf(w, "data: %s\n\n", data)
- f.Flush()
- }
- func serverRunHandler(cfg config.Config) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "POST /run only", http.StatusMethodNotAllowed)
- return
- }
- var req RunRequest
- if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
- http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
- return
- }
- wf, err := loadWorkflow(req)
- if err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- http.Error(w, "engine: "+err.Error(), http.StatusBadRequest)
- return
- }
- adapters, err := buildAdapters(cfg, req.RunParams.WorkspaceID)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
- w.Header().Set("X-Accel-Buffering", "no")
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "streaming unsupported", http.StatusInternalServerError)
- return
- }
- runID := fmt.Sprintf("run_%d", time.Now().UnixMilli())
- label := req.WorkflowID
- if label == "" {
- label = wf.Name
- }
- log.Printf("[%s] start workflow=%s gid=%s mode=%s",
- runID, label, req.RunParams.WorkspaceID, req.RunParams.Mode)
- result, err := eng.ExecuteWithRunParams(r.Context(), nil, adapters, req.RunParams)
- if err != nil {
- writeSSE(w, flusher, sseEvent{
- RunID: runID, Seq: 1, TS: nowTS(), Type: "workflow_failed",
- Payload: map[string]interface{}{"error": err.Error()},
- })
- return
- }
- seq := 1
- for ev := range result.RunEventStream {
- payload := ev.Payload
- if payload == nil {
- payload = map[string]interface{}{}
- }
- writeSSE(w, flusher, sseEvent{
- RunID: runID, Seq: seq, TS: nowTS(),
- Type: string(ev.Type), StepID: ev.StepID, Payload: payload,
- })
- seq++
- }
- log.Printf("[%s] done status=%s events=%d", runID, result.Context.Status, seq-1)
- }
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // run subcommand
- // ─────────────────────────────────────────────────────────────────────────────
- func cmdRun(args []string) {
- fs := flag.NewFlagSet("run", flag.ExitOnError)
- workflowFile := fs.String("workflow", "", "Workflow JSON file (required)")
- workspaceID := fs.String("workspace-id", envOr("WORKSPACE_ID", ""), "Project identifier / gid (required)")
- workspaceRoot := fs.String("workspace-root", "", "Override WORKSPACE_ROOT from config")
- paramsJSON := fs.String("params", "", `Input params JSON: '{"key":"value"}'`)
- mode := fs.String("mode", "create", "create|patch|regenerate|validate")
- cfgFile := fs.String("config", "", "Config file (default: .env in CWD)")
- verbose := fs.Bool("v", false, "Verbose: print all events")
- fs.Usage = func() {
- fmt.Fprintln(os.Stderr, "Usage: workflow run -workflow FILE -workspace-id ID [options]")
- fs.PrintDefaults()
- }
- fs.Parse(args)
- if *workflowFile == "" || *workspaceID == "" {
- fs.Usage()
- os.Exit(1)
- }
- cfg := config.Load(*cfgFile)
- if *workspaceRoot != "" {
- cfg.Workspace.Root = *workspaceRoot
- }
- wfBytes, err := os.ReadFile(*workflowFile)
- if err != nil {
- fatalf("read workflow: %v", err)
- }
- var wf workflow.Workflow
- if err := json.Unmarshal(wfBytes, &wf); err != nil {
- fatalf("parse workflow: %v", err)
- }
- var params map[string]interface{}
- if strings.TrimSpace(*paramsJSON) != "" {
- if err := json.Unmarshal([]byte(*paramsJSON), ¶ms); err != nil {
- fatalf("parse -params: %v", err)
- }
- }
- fmt.Printf("▶ workflow : %s (v%s)\n", wf.Name, wf.Version)
- fmt.Printf(" workspace-id: %s\n", *workspaceID)
- fmt.Printf(" workspace : %s/%s\n", cfg.Workspace.Root, *workspaceID)
- fmt.Printf(" mode : %s\n", *mode)
- if len(params) > 0 {
- raw, _ := json.Marshal(params)
- fmt.Printf(" params : %s\n", raw)
- }
- fmt.Printf(" llm : %s model=%s\n\n", cfg.LLM.URL, cfg.LLM.Model)
- adapters, err := buildAdapters(cfg, *workspaceID)
- if err != nil {
- fatalf("adapters: %v", err)
- }
- eng, err := workflow.NewEngine(&wf)
- if err != nil {
- fatalf("engine: %v", err)
- }
- start := time.Now()
- result, err := eng.ExecuteWithRunParams(context.Background(), nil, adapters,
- workflow.RunParams{Params: params, WorkspaceID: *workspaceID, Mode: *mode})
- if err != nil {
- fatalf("execute: %v", err)
- }
- var steps, llms int
- for ev := range result.RunEventStream {
- switch ev.Type {
- case workflow.RunEventStepStart:
- if ev.StepID != nil {
- fmt.Printf(" → %-30s\n", *ev.StepID)
- }
- case workflow.RunEventStepDone:
- steps++
- case workflow.RunEventStepSkipped:
- if ev.StepID != nil {
- fmt.Printf(" ↷ %-30s (skipped)\n", *ev.StepID)
- }
- case workflow.RunEventStepError:
- id := ""
- if ev.StepID != nil {
- id = *ev.StepID
- }
- fmt.Printf(" ✗ %-30s ERROR: %v\n", id, ev.Payload["error"])
- case workflow.RunEventLLMDone:
- llms++
- id := ""
- if ev.StepID != nil {
- id = *ev.StepID
- }
- fmt.Printf(" llm_done %s %vms model=%v\n",
- id, ev.Payload["latency_ms"], ev.Payload["model"])
- if u, ok := ev.Payload["usage"].(map[string]interface{}); ok {
- fmt.Printf(" tokens in=%v out=%v total=%v\n",
- u["input_tokens"], u["output_tokens"], u["total_tokens"])
- }
- default:
- if *verbose {
- raw, _ := json.Marshal(ev.Payload)
- id := ""
- if ev.StepID != nil {
- id = *ev.StepID
- }
- fmt.Printf(" ~ %-20s %-25s %s\n", ev.Type, id, raw)
- }
- }
- }
- fmt.Printf("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
- status := result.Context.Status
- icon := "✓"
- if status == workflow.StatusFailed {
- icon = "✗"
- }
- fmt.Printf("%s %s steps=%d llm=%d %v\n",
- icon, status, steps, llms, time.Since(start).Round(time.Millisecond))
- if len(result.Context.Variables) > 0 {
- fmt.Println("\n── variables ──")
- for k, v := range result.Context.Variables {
- if s, ok := v.(string); ok {
- if len(s) > 120 {
- fmt.Printf(" %-20s = %q…\n", k, s[:120])
- } else {
- fmt.Printf(" %-20s = %q\n", k, s)
- }
- } else {
- raw, _ := json.MarshalIndent(v, " ", " ")
- fmt.Printf(" %-20s = %s\n", k, raw)
- }
- }
- }
- if len(result.Context.Artifacts) > 0 {
- fmt.Println("\n── artifacts ──")
- for p := range result.Context.Artifacts {
- fmt.Printf(" %s/%s%s\n", cfg.Workspace.Root, *workspaceID, p)
- }
- }
- fmt.Println()
- if status == workflow.StatusFailed {
- os.Exit(1)
- }
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // shared helpers
- // ─────────────────────────────────────────────────────────────────────────────
- func loadWorkflow(req RunRequest) (*workflow.Workflow, error) {
- var wf workflow.Workflow
- switch {
- case req.WorkflowFile != "":
- data, err := os.ReadFile(req.WorkflowFile)
- if err != nil {
- return nil, fmt.Errorf("read workflowFile: %w", err)
- }
- if err := json.Unmarshal(data, &wf); err != nil {
- return nil, fmt.Errorf("parse workflowFile: %w", err)
- }
- case req.WorkflowDef != nil:
- raw, _ := json.Marshal(req.WorkflowDef)
- if err := json.Unmarshal(raw, &wf); err != nil {
- return nil, fmt.Errorf("parse workflowDef: %w", err)
- }
- default:
- return nil, fmt.Errorf("provide workflowDef or workflowFile")
- }
- return &wf, nil
- }
- func buildAdapters(cfg config.Config, workspaceID string) (*workflow.Adapters, error) {
- gid := workspaceID
- if gid == "" {
- gid = "default"
- }
- // v3.16+: Build multi-provider LLM adapter registry
- registry := buildLLMRegistry(cfg)
- file, err := workflow.NewLocalFileAdapter(cfg.Workspace.Root, gid)
- if err != nil {
- return nil, fmt.Errorf("file adapter: %w", err)
- }
- return &workflow.Adapters{
- Service: workflow.NewDefaultServiceAdapter(),
- Component: workflow.NewDefaultComponentAdapter(),
- LLM: registry, // LLMAdapterRegistry implements LLMAdapter
- LLMAdapterRegistry: registry,
- File: file,
- }, nil
- }
- // buildLLMRegistry creates a multi-provider LLM adapter registry (v3.16+).
- // It reads cfg.LLM (global default) and cfg.Providers (per-provider) to build
- // adapters for each known provider (openai, anthropic), then determines
- // the default adapter from LLM_MODEL or LLM_URL.
- func buildLLMRegistry(cfg config.Config) *workflow.LLMAdapterRegistry {
- // Determine default provider from LLM_MODEL (provider/modelId format) or LLM_URL
- defaultProvider := "openai"
- defaultModelId := cfg.LLM.Model
- if strings.Contains(cfg.LLM.Model, "/") {
- parts := strings.SplitN(cfg.LLM.Model, "/", 2)
- defaultProvider = parts[0]
- defaultModelId = parts[1]
- } else if strings.Contains(cfg.LLM.URL, "anthropic.com") {
- defaultProvider = "anthropic"
- }
- // Build OpenAI adapter
- openaiKey := cfg.LLM.Key // fallback
- openaiModel := defaultModelId
- openaiURL := cfg.LLM.URL
- if p, ok := cfg.Providers["openai"]; ok {
- if p.APIKey != "" {
- openaiKey = p.APIKey
- }
- if p.Model != "" {
- openaiModel = p.Model
- }
- if p.BaseURL != "" {
- openaiURL = p.BaseURL
- }
- }
- // Only use openaiModel as default if openai IS the default provider
- if defaultProvider != "openai" {
- if p, ok := cfg.Providers["openai"]; ok && p.Model != "" {
- openaiModel = p.Model
- } else {
- openaiModel = "" // no default model for non-default provider
- }
- }
- openaiAdapter := workflow.NewOpenAIAdapter(workflow.OpenAIConfig{
- BaseURL: openaiURL,
- APIKey: openaiKey,
- Model: openaiModel,
- })
- // Build Anthropic adapter
- anthropicKey := cfg.LLM.Key // fallback
- anthropicModel := ""
- anthropicURL := "https://api.anthropic.com"
- if p, ok := cfg.Providers["anthropic"]; ok {
- if p.APIKey != "" {
- anthropicKey = p.APIKey
- }
- if p.Model != "" {
- anthropicModel = p.Model
- }
- if p.BaseURL != "" {
- anthropicURL = p.BaseURL
- }
- }
- // Use defaultModelId if anthropic is the default provider and no provider-specific model
- if defaultProvider == "anthropic" && anthropicModel == "" {
- anthropicModel = defaultModelId
- }
- anthropicAdapter := workflow.NewAnthropicAdapter(workflow.AnthropicConfig{
- APIKey: anthropicKey,
- Model: anthropicModel,
- BaseURL: anthropicURL,
- })
- // Select default adapter based on determined provider
- var defaultAdapter workflow.LLMAdapter
- switch defaultProvider {
- case "anthropic":
- defaultAdapter = anthropicAdapter
- default:
- defaultAdapter = openaiAdapter
- }
- registry := workflow.NewLLMAdapterRegistry(defaultAdapter, defaultProvider)
- registry.Register("openai", openaiAdapter)
- registry.Register("anthropic", anthropicAdapter)
- return registry
- }
- func nowTS() string {
- return time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
- }
- func envOr(key, fallback string) string {
- if v := os.Getenv(key); v != "" {
- return v
- }
- return fallback
- }
- func fatalf(format string, args ...interface{}) {
- fmt.Fprintf(os.Stderr, "error: "+format+"\n", args...)
- os.Exit(1)
- }
|