Implement local ci.fnl execution
Pipeline now keeps the Fennel/Lua VM alive past load and owns the
dependency graph, so run_fns stay callable and validation, topo-sort,
and id lookup share one graph. Run gained execute(), which topo-sorts
jobs and records each (ci.sh ...)'s output via a RuntimeState carried
in Lua app data. `quire ci run` drives this through a tempdir-rooted
Run for fast iteration on .quire/ci.fnl.

Assisted-by: Claude Opus 4.7 (1M context) via Claude Code
change pkqswvnvrpxzvqnsyzsykprpyxkqqlpw
commit 3f9d0cf2849e55bec3e777581c92e51cd17c91d3
author Alpha Chen <alpha@kejadlen.dev>
date
parent txuvqssk
diff --git a/Cargo.toml b/Cargo.toml
index 976faf5..58c1ed5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,6 +24,7 @@ serde = { version = "*", features = ["derive"] }
 serde_json = "*"
 serde_yaml_ng = "*"
 shell-words = "*"
+tempfile = "*"
 thiserror = "*"
 tokio = { version = "*", features = ["full"] }
 tracing = "*"
@@ -34,5 +35,4 @@ walkdir = "*"
 [dev-dependencies]
 assert_cmd = "*"
 predicates = "*"
-tempfile = "*"
 serde_json = "*"
diff --git a/src/bin/quire/commands/ci.rs b/src/bin/quire/commands/ci.rs
index 26f397f..773de3a 100644
--- a/src/bin/quire/commands/ci.rs
+++ b/src/bin/quire/commands/ci.rs
@@ -1,7 +1,8 @@
 use std::path::PathBuf;
 
 use miette::{IntoDiagnostic, Result};
-use quire::ci::{Ci, CommitRef};
+use quire::Quire;
+use quire::ci::{Ci, CommitRef, RunMeta, RunState, Runs};
 
 /// Validate a repo's ci.fnl without executing any jobs.
 ///
@@ -35,6 +36,82 @@ pub async fn validate(maybe_sha: Option<&str>) -> Result<()> {
     Ok(())
 }
 
+/// Execute a repo's ci.fnl locally for testing.
+///
+/// Loads the pipeline at the resolved commit (working-copy `@` by
+/// default), creates a transient Run rooted at a tempdir, drives the
+/// pipeline through it, and prints each job's `(ci.sh …)` output to
+/// stdout. The tempdir is removed when the command exits.
+pub async fn run(quire: &Quire, maybe_sha: Option<&str>) -> Result<()> {
+    let repo_path = discover_repo()?;
+    let commit = resolve_commit(maybe_sha)?;
+    let ci = Ci::new(repo_path);
+
+    // Pull secrets from the global config; absence is fine for local
+    // testing. A broken-but-present config is a real error.
+    let secrets = match quire.global_config() {
+        Ok(c) => c.secrets,
+        Err(quire::Error::ConfigNotFound(_)) => std::collections::HashMap::new(),
+        Err(e) => return Err(e).into_diagnostic(),
+    };
+
+    let Some(pipeline) = ci.load(&commit, secrets)? else {
+        println!("No ci.fnl found at {}.", commit.display);
+        return Ok(());
+    };
+
+    // Tempdir for run artifacts. TODO: switch to an XDG cache dir
+    // (e.g. $XDG_CACHE_HOME/quire/local-runs) so logs survive past the
+    // command and `tail -f` becomes useful.
+    let tmp = tempfile::tempdir().into_diagnostic()?;
+    let runs = Runs::new(tmp.path().to_path_buf());
+
+    let meta = RunMeta {
+        sha: commit.sha.clone(),
+        r#ref: "@".to_string(),
+        pushed_at: jiff::Timestamp::now(),
+    };
+
+    let mut run = runs.create(&meta)?;
+    println!("Run {}: executing at {}", run.id(), commit.display);
+
+    let exec_result = run.execute(&pipeline);
+
+    for job in pipeline.jobs() {
+        let outputs = run.outputs(&job.id);
+        if outputs.is_empty() {
+            continue;
+        }
+        println!("\n==> {}", job.id);
+        for o in &outputs {
+            if !o.stdout.is_empty() {
+                print!("{}", o.stdout);
+            }
+            if !o.stderr.is_empty() {
+                eprint!("{}", o.stderr);
+            }
+            if o.exit != 0 {
+                println!("(exit {})", o.exit);
+            }
+        }
+    }
+
+    match (run.state(), exec_result) {
+        (RunState::Complete, _) => {
+            println!("\nRun complete.");
+            Ok(())
+        }
+        (RunState::Failed, Err(e)) => {
+            println!("\nRun failed.");
+            Err(e).into_diagnostic()
+        }
+        (state, result) => {
+            println!("\nRun ended in unexpected state: {state:?}");
+            result.into_diagnostic()
+        }
+    }
+}
+
 /// Find the repo root from the current working directory using jj.
 fn resolve_commit(maybe_sha: Option<&str>) -> Result<CommitRef> {
     match maybe_sha {
diff --git a/src/bin/quire/main.rs b/src/bin/quire/main.rs
index 6ebf739..cd3de71 100644
--- a/src/bin/quire/main.rs
+++ b/src/bin/quire/main.rs
@@ -84,6 +84,13 @@ enum CiCommands {
         #[arg(short, long)]
         sha: Option<String>,
     },
+
+    /// Execute a repo's ci.fnl locally for testing.
+    Run {
+        /// Commit SHA to run. Defaults to the working-copy revision.
+        #[arg(short, long)]
+        sha: Option<String>,
+    },
 }
 
 /// Initialize Sentry if the global config provides a DSN.
@@ -158,6 +165,7 @@ async fn main() -> Result<()> {
         },
         Commands::Ci { command } => match command {
             CiCommands::Validate { sha } => commands::ci::validate(sha.as_deref()).await?,
+            CiCommands::Run { sha } => commands::ci::run(&quire, sha.as_deref()).await?,
         },
     }
 
diff --git a/src/ci/lua.rs b/src/ci/lua.rs
index 15148c6..3440e1d 100644
--- a/src/ci/lua.rs
+++ b/src/ci/lua.rs
@@ -150,7 +150,7 @@ impl Cmd {
     // TODO: stream stdout/stderr live instead of buffering. `output()`
     // captures the full child output in memory and only returns at exit,
     // so long-running or chatty jobs show nothing until they finish.
-    fn run(self, opts: ShOpts) -> std::io::Result<Output> {
+    fn run(self, opts: ShOpts) -> std::io::Result<ShOutput> {
         let mut command: std::process::Command = self.into();
         for (k, v) in opts.env {
             command.env(k, v);
@@ -161,7 +161,7 @@ impl Cmd {
         let output = command.output()?;
         // Signal-killed processes have no exit code; collapse them to -1
         // for now. Surfacing the signal as a separate field is future work.
-        Ok(Output {
+        Ok(ShOutput {
             exit: output.status.code().unwrap_or(-1),
             stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
             stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
@@ -221,19 +221,68 @@ impl mlua::FromLua for ShOpts {
 /// A non-zero exit is reported in `:exit`, not raised as a Lua error —
 /// matches the shape `(container …)` will eventually use so callers can
 /// branch on it.
-#[derive(serde::Serialize, serde::Deserialize)]
-struct Output {
-    exit: i32,
-    stdout: String,
-    stderr: String,
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct ShOutput {
+    pub exit: i32,
+    pub stdout: String,
+    pub stderr: String,
+}
+
+/// Per-execution state the Lua bridge consults at run time.
+///
+/// `Run::execute` constructs one of these, installs it on the Lua VM
+/// via app data, and updates `current_job` around each `run_fn` call.
+/// `(ci.sh …)` reads the state on each invocation and appends its
+/// captured output to `outputs[current_job]`.
+///
+/// Outside a run (e.g. unit tests that drive `run_fn` directly), no
+/// `Rc<RuntimeState>` is installed and `(ci.sh …)` simply executes
+/// the command and returns its result without recording.
+#[derive(Debug, Default)]
+pub(super) struct RuntimeState {
+    current_job: RefCell<Option<String>>,
+    outputs: RefCell<HashMap<String, Vec<ShOutput>>>,
+}
+
+impl RuntimeState {
+    /// Mark `id` as the currently executing job. `(ci.sh …)` invocations
+    /// from this job's `run_fn` will record output under `id`.
+    pub(super) fn enter_job(&self, id: &str) {
+        *self.current_job.borrow_mut() = Some(id.to_string());
+    }
+
+    /// Clear the current-job cursor. Subsequent `(ci.sh …)` calls (if
+    /// any) won't be attributed to a job until `enter_job` is called again.
+    pub(super) fn leave_job(&self) {
+        *self.current_job.borrow_mut() = None;
+    }
+
+    /// Snapshot the recorded outputs for `id`. Empty if the job
+    /// produced none (or hasn't run).
+    pub(super) fn outputs(&self, id: &str) -> Vec<ShOutput> {
+        self.outputs.borrow().get(id).cloned().unwrap_or_default()
+    }
 }
 
 /// Body of `(ci.sh cmd opts?)`. Glue between the Lua call and
-/// `Cmd::run` — defaults the opts and converts both directions.
+/// `Cmd::run` — defaults the opts, runs the command, records output
+/// into the active `RuntimeState` (if any), and converts the result
+/// back to a Lua table.
 fn run_sh(lua: &Lua, (cmd, opts): (Cmd, Option<ShOpts>)) -> mlua::Result<mlua::Value> {
     let output = cmd
         .run(opts.unwrap_or_default())
         .map_err(mlua::Error::external)?;
+
+    if let Some(rt) = lua.app_data_ref::<Rc<RuntimeState>>() {
+        if let Some(job) = rt.current_job.borrow().as_ref() {
+            rt.outputs
+                .borrow_mut()
+                .entry(job.clone())
+                .or_default()
+                .push(output.clone());
+        }
+    }
+
     lua.to_value(&output)
 }
 
@@ -242,13 +291,8 @@ mod tests {
     use super::super::pipeline::load;
     use super::*;
 
-    fn fennel() -> Fennel {
-        Fennel::new().expect("Fennel::new() should succeed")
-    }
-
     #[test]
     fn ci_secret_returns_resolved_value() {
-        let f = fennel();
         let mut secrets = HashMap::new();
         secrets.insert(
             "github_token".to_string(),
@@ -256,8 +300,7 @@ mod tests {
         );
         let source = r#"(local ci (require :quire.ci))
 (ci.job :grab [:quire/push] (fn [_] (ci.secret :github_token)))"#;
-        let pipeline =
-            load(&f, source, "ci.fnl", "ci.fnl", secrets).expect("load should succeed");
+        let pipeline = load(source, "ci.fnl", "ci.fnl", secrets).expect("load should succeed");
         let token: String = pipeline.jobs()[0]
             .run_fn
             .call(())
@@ -267,11 +310,10 @@ mod tests {
 
     #[test]
     fn ci_secret_errors_for_unknown_name() {
-        let f = fennel();
         let source = r#"(local ci (require :quire.ci))
 (ci.job :grab [:quire/push] (fn [_] (ci.secret :missing)))"#;
-        let pipeline = load(&f, source, "ci.fnl", "ci.fnl", HashMap::new())
-            .expect("load should succeed");
+        let pipeline =
+            load(source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
         let err = pipeline.jobs()[0]
             .run_fn
             .call::<mlua::Value>(())
@@ -284,18 +326,20 @@ mod tests {
     }
 
     /// Build a pipeline whose single job's run-fn invokes `(ci.sh …)`,
-    /// invoke it, and decode the resulting Lua table as Output through
-    /// the same VM via `lua.from_value`. Owned data, so the Fennel VM
-    /// can drop without a use-after-free.
-    fn run_sh_via_job(source: &str) -> Output {
-        let f = fennel();
+    /// invoke it, and decode the resulting Lua table as ShOutput through
+    /// the pipeline's VM via `lua.from_value`.
+    fn run_sh_via_job(source: &str) -> ShOutput {
         let pipeline =
-            load(&f, source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
+            load(source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
         let value: mlua::Value = pipeline.jobs()[0]
             .run_fn
             .call(())
             .expect("ci.sh call should return a value");
-        f.lua().from_value(value).expect("decode Output")
+        pipeline
+            .fennel()
+            .lua()
+            .from_value(value)
+            .expect("decode ShOutput")
     }
 
     #[test]
@@ -362,9 +406,7 @@ mod tests {
 
     #[test]
     fn ci_sh_rejects_unknown_opt_key() {
-        let f = fennel();
         let pipeline = load(
-            &f,
             r#"(local ci (require :quire.ci))
 (ci.job :go [:quire/push] (fn [_] (ci.sh "echo hi" {:cwdir "/tmp"})))"#,
             "ci.fnl",
@@ -385,9 +427,7 @@ mod tests {
 
     #[test]
     fn ci_sh_rejects_non_sequence_table_as_cmd() {
-        let f = fennel();
         let pipeline = load(
-            &f,
             r#"(local ci (require :quire.ci))
 (ci.job :go [:quire/push] (fn [_] (ci.sh {:env {:FOO "bar"}})))"#,
             "ci.fnl",
@@ -408,9 +448,7 @@ mod tests {
 
     #[test]
     fn ci_sh_rejects_empty_argv() {
-        let f = fennel();
         let pipeline = load(
-            &f,
             r#"(local ci (require :quire.ci))
 (ci.job :go [:quire/push] (fn [_] (ci.sh [])))"#,
             "ci.fnl",
diff --git a/src/ci/mod.rs b/src/ci/mod.rs
index 629b714..f735b69 100644
--- a/src/ci/mod.rs
+++ b/src/ci/mod.rs
@@ -65,10 +65,9 @@ impl Ci {
         let Some(source) = self.source(&commit.sha)? else {
             return Ok(None);
         };
-        let fennel = crate::fennel::Fennel::new()?;
         let name = CI_FNL.to_string();
         let lua_name = format!("{}:{CI_FNL}", commit.sha);
-        let pipeline = pipeline::load(&fennel, &source, &lua_name, &name, secrets)?;
+        let pipeline = pipeline::load(&source, &lua_name, &name, secrets)?;
         Ok(Some(pipeline))
     }
 
@@ -173,11 +172,10 @@ fn trigger_ref(
 
     run.transition(RunState::Active)?;
 
-    let fennel = crate::fennel::Fennel::new()?;
     let name = CI_FNL.to_string();
     let lua_name = format!("{}:{CI_FNL}", push_ref.new_sha);
 
-    match pipeline::load(&fennel, &source, &lua_name, &name, secrets) {
+    match pipeline::load(&source, &lua_name, &name, secrets) {
         Ok(_pipeline) => run.transition(RunState::Complete)?,
         Err(e) => {
             run.transition(RunState::Failed)?;
diff --git a/src/ci/pipeline.rs b/src/ci/pipeline.rs
index 2ba4a97..b4373d6 100644
--- a/src/ci/pipeline.rs
+++ b/src/ci/pipeline.rs
@@ -7,12 +7,18 @@
 use std::collections::HashMap;
 
 use miette::{NamedSource, SourceSpan};
+use petgraph::Graph;
+use petgraph::graph::NodeIndex;
 
 use super::lua;
 use crate::Result;
 use crate::fennel::Fennel;
 use crate::secret::SecretString;
 
+/// Edges point from dependency to dependent. Node weights are indices
+/// into `Pipeline::jobs`; source refs (e.g. `quire/push`) are not nodes.
+type JobGraph = Graph<usize, ()>;
+
 /// A registered job extracted from ci.fnl.
 ///
 /// Constructed via `Job::new`, which enforces the per-job validation
@@ -72,14 +78,66 @@ impl Job {
 /// Obtain via `pipeline::load`, which parses the Fennel source and
 /// validates the result. Holding a `Pipeline` is proof that the graph
 /// is sound.
+///
+/// Owns the Fennel/Lua VM so the registered `run_fn`s remain callable
+/// after `load` returns.
 pub struct Pipeline {
     jobs: Vec<Job>,
+    graph: JobGraph,
+    /// Job id → node index in `graph`, for O(1) lookup.
+    node_index: HashMap<String, NodeIndex>,
+    fennel: Fennel,
 }
 
 impl Pipeline {
     pub fn jobs(&self) -> &[Job] {
         &self.jobs
     }
+
+    /// Look up a job by id.
+    pub fn job(&self, id: &str) -> Option<&Job> {
+        self.node_index
+            .get(id)
+            .map(|&idx| &self.jobs[self.graph[idx]])
+    }
+
+    /// Borrow the underlying Fennel/Lua VM. Used by the executor to
+    /// install runtime state on the VM before invoking job `run_fn`s.
+    pub(crate) fn fennel(&self) -> &Fennel {
+        &self.fennel
+    }
+
+    /// Return job IDs in topological order — dependencies before
+    /// dependents. The pipeline is already validated as acyclic, so
+    /// this never fails.
+    pub(crate) fn topo_order(&self) -> Vec<&str> {
+        petgraph::algo::toposort(&self.graph, None)
+            .expect("pipeline is validated as acyclic")
+            .into_iter()
+            .map(|idx| self.jobs[self.graph[idx]].id.as_str())
+            .collect()
+    }
+}
+
+/// Build the dependency graph for `jobs`. Inputs that don't match a
+/// known job id are treated as source refs (e.g. `quire/push`) and
+/// don't get an edge — they're not nodes in this graph.
+fn build_graph(jobs: &[Job]) -> (JobGraph, HashMap<String, NodeIndex>) {
+    let mut graph = JobGraph::new();
+    let mut node_index = HashMap::new();
+    for (i, job) in jobs.iter().enumerate() {
+        let idx = graph.add_node(i);
+        node_index.insert(job.id.clone(), idx);
+    }
+    for job in jobs {
+        let dependent = node_index[&job.id];
+        for input in &job.inputs {
+            if let Some(&dependency) = node_index.get(input) {
+                graph.add_edge(dependency, dependent, ());
+            }
+        }
+    }
+    (graph, node_index)
 }
 
 /// Parse and validate a ci.fnl source string into a `Pipeline`.
@@ -89,13 +147,13 @@ impl Pipeline {
 /// found are gathered into a single `LoadError` carrying the source
 /// for miette to render with inline labels.
 pub(crate) fn load(
-    fennel: &Fennel,
     source: &str,
     filename: &str,
     display: &str,
     secrets: HashMap<String, SecretString>,
 ) -> Result<Pipeline> {
-    let results = lua::parse(fennel, source, filename, display, secrets)?;
+    let fennel = Fennel::new()?;
+    let results = lua::parse(&fennel, source, filename, display, secrets)?;
 
     let mut errors = Vec::new();
     let mut jobs = Vec::new();
@@ -106,12 +164,19 @@ pub(crate) fn load(
         }
     }
 
-    if let Err(post) = validate_post_graph(&jobs) {
+    let (graph, node_index) = build_graph(&jobs);
+
+    if let Err(post) = validate_post_graph(&jobs, &graph) {
         errors.extend(post);
     }
 
     if errors.is_empty() {
-        Ok(Pipeline { jobs })
+        Ok(Pipeline {
+            jobs,
+            graph,
+            node_index,
+            fennel,
+        })
     } else {
         Err(LoadError {
             src: NamedSource::new(display, source.to_string()),
@@ -199,43 +264,22 @@ pub struct LoadError {
 /// Per-job pre-graph rules (slash-in-id, empty inputs) run inside the
 /// `(ci.job …)` callback during `lua::parse`, so they are not re-checked
 /// here.
-fn validate_post_graph(jobs: &[Job]) -> std::result::Result<(), Vec<ValidationError>> {
+fn validate_post_graph(
+    jobs: &[Job],
+    graph: &JobGraph,
+) -> std::result::Result<(), Vec<ValidationError>> {
     let mut errors = Vec::new();
     let mut cycle_members: std::collections::HashSet<&str> = std::collections::HashSet::new();
 
-    // Rule 1: acyclic.
-    //
-    // Build a directed graph where edges point from dependency to
-    // dependent. Source refs (e.g. "quire/push") are not nodes.
-    let mut graph: petgraph::Graph<&str, ()> = petgraph::Graph::new();
-    let mut node_map: std::collections::HashMap<&str, petgraph::graph::NodeIndex> =
-        std::collections::HashMap::new();
-
-    for job in jobs {
-        let idx = graph.add_node(job.id.as_str());
-        node_map.insert(job.id.as_str(), idx);
-    }
-
-    for job in jobs {
-        let dependent = node_map[job.id.as_str()];
-        for input in &job.inputs {
-            if let Some(&dependency) = node_map.get(input.as_str()) {
-                graph.add_edge(dependency, dependent, ());
-            }
-        }
-    }
-
-    // Each non-trivial strongly connected component is a distinct cycle.
-    // A single-node SCC is only a cycle if it has a self-edge.
-    for scc in petgraph::algo::tarjan_scc(&graph) {
+    // Rule 1: acyclic. Each non-trivial strongly connected component
+    // is a distinct cycle. A single-node SCC is only a cycle if it has
+    // a self-edge.
+    for scc in petgraph::algo::tarjan_scc(graph) {
         let is_cycle = scc.len() > 1 || (scc.len() == 1 && graph.contains_edge(scc[0], scc[0]));
         if !is_cycle {
             continue;
         }
-        let mut members: Vec<&Job> = scc
-            .iter()
-            .filter_map(|&idx| jobs.iter().find(|j| j.id == graph[idx]))
-            .collect();
+        let mut members: Vec<&Job> = scc.iter().map(|&idx| &jobs[graph[idx]]).collect();
         members.sort_by(|a, b| a.id.cmp(&b.id));
         for j in &members {
             cycle_members.insert(j.id.as_str());
@@ -296,17 +340,12 @@ fn validate_post_graph(jobs: &[Job]) -> std::result::Result<(), Vec<ValidationEr
 mod tests {
     use super::*;
 
-    fn fennel() -> Fennel {
-        Fennel::new().expect("Fennel::new() should succeed")
-    }
-
     #[test]
     fn load_registers_a_job() {
-        let f = fennel();
         let source = r#"(local ci (require :quire.ci))
 (ci.job :test [:quire/push] (fn [_] nil))"#;
         let pipeline =
-            load(&f, source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
+            load(source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
         let jobs = pipeline.jobs();
         assert_eq!(jobs.len(), 1);
         assert_eq!(jobs[0].id, "test");
@@ -315,14 +354,13 @@ mod tests {
 
     #[test]
     fn load_registers_multiple_jobs() {
-        let f = fennel();
         let source = r#"
 (local ci (require :quire.ci))
 (ci.job :build [:quire/push] (fn [_] nil))
 (ci.job :test [:build] (fn [_] nil))
 "#;
         let pipeline =
-            load(&f, source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
+            load(source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
         let jobs = pipeline.jobs();
         assert_eq!(jobs.len(), 2);
         assert_eq!(jobs[0].id, "build");
@@ -333,7 +371,6 @@ mod tests {
 
     #[test]
     fn load_captures_source_line() {
-        let f = fennel();
         let source = "(local ci (require :quire.ci))
 (ci.job :first [:quire/push] (fn [_] nil))
 (ci.job :second [:quire/push] (fn [_] nil))
@@ -341,7 +378,7 @@ mod tests {
 
 (ci.job :sixth [:quire/push] (fn [_] nil))";
         let pipeline =
-            load(&f, source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
+            load(source, "ci.fnl", "ci.fnl", HashMap::new()).expect("load should succeed");
         let lines: Vec<usize> = pipeline
             .jobs()
             .iter()
@@ -352,16 +389,16 @@ mod tests {
 
     #[test]
     fn load_errors_on_bad_fennel() {
-        let f = fennel();
-        let result = load(&f, "{:bad {:}", "ci.fnl", "ci.fnl", HashMap::new());
+        let result = load("{:bad {:}", "ci.fnl", "ci.fnl", HashMap::new());
         assert!(result.is_err(), "malformed Fennel should fail");
     }
 
     /// Parse a Fennel source into per-job results. Pre-graph rules
     /// run during parsing, so each entry is `Ok(Job)` or
-    /// `Err(ValidationError)`.
+    /// `Err(ValidationError)`. The local Fennel is dropped on return,
+    /// but the returned `Job`s only need their non-VM fields here.
     fn parse_results(source: &str) -> Vec<std::result::Result<Job, ValidationError>> {
-        let f = fennel();
+        let f = Fennel::new().expect("Fennel::new() should succeed");
         lua::parse(&f, source, "ci.fnl", "ci.fnl", HashMap::new()).expect("parse should succeed")
     }
 
@@ -374,6 +411,13 @@ mod tests {
             .collect()
     }
 
+    /// Run post-graph validation against `jobs`, building the dependency
+    /// graph the same way `load` does.
+    fn validate(jobs: &[Job]) -> std::result::Result<(), Vec<ValidationError>> {
+        let (graph, _) = build_graph(jobs);
+        validate_post_graph(jobs, &graph)
+    }
+
     #[test]
     fn validate_accepts_valid_config() {
         let jobs = parsed_jobs(
@@ -383,7 +427,7 @@ mod tests {
 (ci.job :test [:build :quire/push] (fn [_] nil))
 "#,
         );
-        assert!(validate_post_graph(&jobs).is_ok());
+        assert!(validate(&jobs).is_ok());
     }
 
     #[test]
@@ -395,7 +439,7 @@ mod tests {
 (ci.job :b [:a] (fn [_] nil))
 "#,
         );
-        let errs = validate_post_graph(&jobs).unwrap_err();
+        let errs = validate(&jobs).unwrap_err();
         assert!(
             errs.iter().any(|e| matches!(e, ValidationError::Cycle { cycle_jobs, .. } if cycle_jobs.contains(&"a".to_string()) && cycle_jobs.contains(&"b".to_string()))),
             "should report a cycle involving a and b: {errs:?}"
@@ -412,7 +456,7 @@ mod tests {
 (ci.job :clean [:quire/push] (fn [_] nil))
 "#,
         );
-        let errs = validate_post_graph(&jobs).unwrap_err();
+        let errs = validate(&jobs).unwrap_err();
         let cycle_errs: Vec<&Vec<String>> = errs
             .iter()
             .filter_map(|e| match e {
@@ -439,7 +483,7 @@ mod tests {
 (ci.job :d [:c :quire/push] (fn [_] nil))
 "#,
         );
-        let errs = validate_post_graph(&jobs).unwrap_err();
+        let errs = validate(&jobs).unwrap_err();
         let cycle_count = errs
             .iter()
             .filter(|e| matches!(e, ValidationError::Cycle { .. }))
@@ -486,7 +530,7 @@ mod tests {
 (ci.job :b [:a] (fn [_] nil))
 "#,
         );
-        let errs = validate_post_graph(&jobs).unwrap_err();
+        let errs = validate(&jobs).unwrap_err();
         let unreachable_count = errs
             .iter()
             .filter(|e| matches!(e, ValidationError::Unreachable { .. }))
@@ -506,7 +550,7 @@ mod tests {
             r#"(local ci (require :quire.ci))
 (ci.job :orphan [:does-not-exist] (fn [_] nil))"#,
         );
-        let errs = validate_post_graph(&jobs).unwrap_err();
+        let errs = validate(&jobs).unwrap_err();
         assert!(
             errs.iter().any(
                 |e| matches!(e, ValidationError::Unreachable { job_id, .. } if job_id == "orphan")
diff --git a/src/ci/run.rs b/src/ci/run.rs
index 4bca1bb..c9f941e 100644
--- a/src/ci/run.rs
+++ b/src/ci/run.rs
@@ -6,9 +6,12 @@
 //! `rename` operations.
 
 use std::path::{Path, PathBuf};
+use std::rc::Rc;
 
 use jiff::Timestamp;
 
+use super::lua::{RuntimeState, ShOutput};
+use super::pipeline::Pipeline;
 use crate::{Error, Result};
 
 /// The state of a CI run.
@@ -204,13 +207,17 @@ impl Runs {
 
 /// A CI run on disk.
 ///
-/// Owns the path to the run directory. Tracks current state so that
+/// Owns the path to the run directory and the in-memory execution
+/// state used while driving a pipeline. Tracks current state so that
 /// `transition` can move the directory in one call.
-#[derive(Debug)]
 pub struct Run {
     base: PathBuf,
     state: RunState,
     id: String,
+    /// Per-execution state shared with the Lua bridge: tracks the
+    /// currently-running job and accumulates per-job `(ci.sh …)` output.
+    /// Cloned (Rc) into Lua app data when `execute` runs.
+    runtime: Rc<RuntimeState>,
 }
 
 impl Run {
@@ -235,12 +242,67 @@ impl Run {
     /// `pending/`, `active/`). Returns an error if `meta.yml` or
     /// `times.yml` are missing or unreadable.
     pub fn open(base: PathBuf, state: RunState, id: String) -> Result<Self> {
-        let run = Self { base, state, id };
+        let run = Self {
+            base,
+            state,
+            id,
+            runtime: Rc::new(RuntimeState::default()),
+        };
         run.read_meta()?;
         run.read_times()?;
         Ok(run)
     }
 
+    /// Drive `pipeline` to completion through this run.
+    ///
+    /// Topo-sorts the jobs, transitions Pending → Active, then invokes
+    /// each `run_fn` in dependency order. `(ci.sh …)` calls record their
+    /// captured output under the current job — readable via [`outputs`]
+    /// after `execute` returns. The run finishes in `Complete` if every
+    /// job's `run_fn` returned without error, otherwise `Failed`.
+    ///
+    /// Source-ref filtering (e.g. running only `quire/push`-reachable
+    /// jobs) is not yet implemented; for now every validated job runs.
+    pub fn execute(&mut self, pipeline: &Pipeline) -> Result<()> {
+        pipeline.fennel().lua().set_app_data(self.runtime.clone());
+
+        let order: Vec<String> = pipeline
+            .topo_order()
+            .into_iter()
+            .map(String::from)
+            .collect();
+
+        self.transition(RunState::Active)?;
+
+        for job_id in &order {
+            let job = pipeline
+                .job(job_id)
+                .expect("topo_order returned a job id not in pipeline");
+
+            self.runtime.enter_job(&job.id);
+            let result = job.run_fn.call::<mlua::Value>(());
+            self.runtime.leave_job();
+
+            if let Err(e) = result {
+                self.transition(RunState::Failed)?;
+                return Err(Error::JobFailed {
+                    job: job.id.clone(),
+                    source: Box::new(e),
+                });
+            }
+        }
+
+        self.transition(RunState::Complete)?;
+        Ok(())
+    }
+
+    /// Snapshot the `(ci.sh …)` outputs recorded for `job_id` during
+    /// the most recent `execute` call. Empty if the job hasn't run or
+    /// produced no output.
+    pub fn outputs(&self, job_id: &str) -> Vec<ShOutput> {
+        self.runtime.outputs(job_id)
+    }
+
     /// Transition the run from its current state to a new state.
     ///
     /// Moves the run directory between state parent directories and stamps
@@ -487,6 +549,7 @@ mod tests {
             base: PathBuf::from("/tmp/quire-test-runs/test.git"),
             state: RunState::Pending,
             id: uuid::Uuid::now_v7().to_string(),
+            runtime: Rc::new(RuntimeState::default()),
         };
 
         let result = run.transition(RunState::Active);
@@ -576,4 +639,78 @@ mod tests {
         let loaded_meta = run.read_meta().expect("read meta");
         assert_eq!(loaded_meta, test_meta());
     }
+
+    fn load(source: &str) -> Pipeline {
+        super::super::pipeline::load(source, "ci.fnl", "ci.fnl", std::collections::HashMap::new())
+            .expect("load should succeed")
+    }
+
+    #[test]
+    fn execute_records_outputs_per_job() {
+        let (_dir, quire) = tmp_quire();
+        let runs = test_runs(&quire);
+        let mut run = runs.create(&test_meta()).expect("create");
+
+        let pipeline = load(
+            r#"(local ci (require :quire.ci))
+(ci.job :a [:quire/push] (fn [_] (ci.sh ["echo" "from-a"])))
+(ci.job :b [:a] (fn [_] (ci.sh ["echo" "from-b"])))"#,
+        );
+
+        run.execute(&pipeline).expect("execute");
+
+        assert_eq!(run.state(), RunState::Complete);
+
+        let a = run.outputs("a");
+        let b = run.outputs("b");
+        assert_eq!(a.len(), 1);
+        assert_eq!(a[0].stdout, "from-a\n");
+        assert_eq!(b.len(), 1);
+        assert_eq!(b[0].stdout, "from-b\n");
+    }
+
+    #[test]
+    fn execute_runs_jobs_in_topo_order() {
+        // `b` depends on `a`, but the registration order puts `b` first.
+        // Topo-sorted execution must run `a` before `b`.
+        let (_dir, quire) = tmp_quire();
+        let runs = test_runs(&quire);
+        let mut run = runs.create(&test_meta()).expect("create");
+
+        let log = quire.base_dir().join("order.log");
+        let log_str = log.to_string_lossy();
+        let source = format!(
+            r#"(local ci (require :quire.ci))
+(ci.job :b [:a] (fn [_] (ci.sh (.. "echo b >> {log}"))))
+(ci.job :a [:quire/push] (fn [_] (ci.sh (.. "echo a >> {log}"))))"#,
+            log = log_str
+        );
+        let pipeline = load(&source);
+
+        run.execute(&pipeline).expect("execute");
+
+        let contents = fs_err::read_to_string(&log).expect("read log");
+        assert_eq!(contents, "a\nb\n");
+    }
+
+    #[test]
+    fn execute_stops_on_first_failure_and_transitions_failed() {
+        let (_dir, quire) = tmp_quire();
+        let runs = test_runs(&quire);
+        let mut run = runs.create(&test_meta()).expect("create");
+
+        let pipeline = load(
+            r#"(local ci (require :quire.ci))
+(ci.job :a [:quire/push] (fn [_] (error "boom")))
+(ci.job :b [:a] (fn [_] (ci.sh ["echo" "should-not-run"])))"#,
+        );
+
+        let err = run.execute(&pipeline).expect_err("expected failure");
+        assert!(matches!(err, Error::JobFailed { ref job, .. } if job == "a"));
+        assert_eq!(run.state(), RunState::Failed);
+        assert!(
+            run.outputs("b").is_empty(),
+            "b should not have run after a failed"
+        );
+    }
 }
diff --git a/src/error.rs b/src/error.rs
index 69cf2a0..9e325be 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -30,6 +30,13 @@ pub enum Error {
     #[error("invalid run transition: {from:?} -> {to:?}")]
     InvalidTransition { from: RunState, to: RunState },
 
+    #[error("job '{job}' failed")]
+    JobFailed {
+        job: String,
+        #[source]
+        source: Box<mlua::Error>,
+    },
+
     #[error("git error: {0}")]
     Git(String),