Collapse Pipeline's job storage into a single graph
Pipeline used to hold each Job three ways: a Vec<Job>, a
Graph<usize, ()> whose node weights were indices into the Vec, and a
HashMap<String, NodeIndex> for id lookups. Two hops for every
NodeIndex → Job access (NodeIndex → usize → Vec[usize]) and the
ownership story was split between the Vec and the graph.
Switch to Graph<Job, ()>: nodes own the Job values directly, edges
still encode the dependency relation. The Vec<Job> field disappears,
the usize indirection disappears, and id lookups go via a single
HashMap<String, NodeIndex> renamed `by_id`.
API changes:
- `pipeline.jobs()` now returns `Vec<&Job>` in topological order
instead of `&[Job]` in registration order. Nothing in production
relied on registration order, and topo order is what every real
caller (executor, validate CLI) actually wants. Removes the need
for a separate `topo_order()` accessor.
- `pipeline.topo_order() -> Vec<&str>` is gone; iterate
`pipeline.jobs()` and read `.id` instead. Mirror change on
Runtime: `runtime.topo_order()` becomes `runtime.jobs()`.
- Adds `pipeline.job_count()` for the "is the pipeline empty?"
check without allocating the Vec.
`build_graph` now consumes its `Vec<Job>` argument and folds each Job
into a graph node, then walks the inserted indices to wire edges from
each job's input list. `validate_post_graph` takes only the graph
since node weights are Jobs.
Tests pre/post-graph that asserted on registration-order indexing
(`jobs[0].id == "build"`) are rewritten to either look up by id or
sort before comparing — neither test was actually validating ordering
behavior, only happening to depend on it.
Assisted-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
diff --git a/quire-ci/src/main.rs b/quire-ci/src/main.rs
index 0eb33fe..11a9a2e 100644
--- a/quire-ci/src/main.rs
+++ b/quire-ci/src/main.rs
@@ -271,8 +271,7 @@ fn init_sentry(
fn validate(workspace: PathBuf) -> miette::Result<()> {
let pipeline = compile_at(&workspace)?;
- let jobs = pipeline.jobs();
- if jobs.is_empty() {
+ if pipeline.job_count() == 0 {
println!("No jobs registered.");
return Ok(());
}
@@ -281,12 +280,10 @@ fn validate(workspace: PathBuf) -> miette::Result<()> {
println!("Image: {image}");
}
- let topo = pipeline.topo_order();
println!("Jobs (topological order):");
- for id in &topo {
- let job = pipeline.job(id).expect("topo_order returns valid ids");
+ for job in pipeline.jobs() {
let inputs = job.inputs.join(", ");
- println!(" {id} <- [{inputs}]");
+ println!(" {} <- [{inputs}]", job.id);
}
println!("\nAll validations passed.");
@@ -340,11 +337,7 @@ fn run_pipeline(
) -> miette::Result<()> {
let pipeline = compile_at(&workspace)?;
- let job_ids: Vec<String> = pipeline
- .topo_order()
- .into_iter()
- .map(|s| s.to_string())
- .collect();
+ let job_ids: Vec<String> = pipeline.jobs().iter().map(|j| j.id.clone()).collect();
if job_ids.is_empty() {
return Ok(());
}
@@ -415,7 +408,7 @@ fn run_pipeline(
let run_fn = runtime
.job(job_id)
- .expect("topo_order returns valid ids")
+ .expect("pipeline.jobs() returns valid ids")
.run_fn
.clone();
diff --git a/quire-core/src/ci/pipeline.rs b/quire-core/src/ci/pipeline.rs
index e4b1167..5f08981 100644
--- a/quire-core/src/ci/pipeline.rs
+++ b/quire-core/src/ci/pipeline.rs
@@ -81,9 +81,10 @@ pub enum Diagnostic {
Structure(#[from] StructureError),
}
-/// Edges point from dependency to dependent. Node weights are indices
-/// into `Pipeline::jobs`; source refs (e.g. `quire/push`) are not nodes.
-type JobGraph = Graph<usize, ()>;
+/// Edges point from dependency to dependent. Node weights are the
+/// `Job` values themselves; source refs (e.g. `quire/push`) are not
+/// nodes in this graph.
+type JobGraph = Graph<Job, ()>;
/// A registered job extracted from ci.fnl.
///
@@ -180,10 +181,12 @@ impl Job {
/// Owns the Fennel/Lua VM so the registered `run_fn`s remain callable
/// after `compile` returns.
pub struct Pipeline {
- jobs: Vec<Job>,
+ /// Jobs and dependencies in one structure: nodes own `Job` values,
+ /// edges go from dependency to dependent. Replaces the old pair of
+ /// `Vec<Job>` plus `Graph<usize, ()>` (node weights as vec indices).
graph: JobGraph,
- /// Job id → node index in `graph`, for O(1) lookup.
- node_index: HashMap<String, NodeIndex>,
+ /// Job id → node index, for O(1) lookup by id.
+ by_id: HashMap<String, NodeIndex>,
fennel: Fennel,
/// Container image declared via `(ci.image "...")`, if any.
image: Option<String>,
@@ -200,15 +203,26 @@ pub struct Pipeline {
}
impl Pipeline {
- pub fn jobs(&self) -> &[Job] {
- &self.jobs
+ /// Jobs in topological order — dependencies before dependents.
+ /// The pipeline is validated as acyclic, so toposort never fails.
+ /// This is the only order callers should iterate in; registration
+ /// order isn't exposed because nothing relies on it.
+ pub fn jobs(&self) -> Vec<&Job> {
+ petgraph::algo::toposort(&self.graph, None)
+ .expect("pipeline is validated as acyclic")
+ .into_iter()
+ .map(|idx| &self.graph[idx])
+ .collect()
+ }
+
+ /// Number of registered jobs.
+ pub fn job_count(&self) -> usize {
+ self.graph.node_count()
}
/// Look up a job by id.
pub fn job(&self, id: &str) -> Option<&Job> {
- self.node_index
- .get(id)
- .map(|&idx| &self.jobs[self.graph[idx]])
+ self.by_id.get(id).map(|&idx| &self.graph[idx])
}
/// Borrow the underlying Fennel/Lua VM.
@@ -237,17 +251,6 @@ impl Pipeline {
&self.source_name
}
- /// Return job IDs in topological order — dependencies before
- /// dependents. The pipeline is already validated as acyclic, so
- /// this never fails.
- pub fn topo_order(&self) -> Vec<&str> {
- petgraph::algo::toposort(&self.graph, None)
- .expect("pipeline is validated as acyclic")
- .into_iter()
- .map(|idx| self.jobs[self.graph[idx]].id.as_str())
- .collect()
- }
-
/// For each job, the set of input names — direct and transitive,
/// including source refs — reachable through the input graph. The
/// job's own id is not included.
@@ -262,22 +265,21 @@ impl Pipeline {
pub fn transitive_inputs(&self) -> HashMap<String, HashSet<String>> {
let reversed = Reversed(&self.graph);
let mut result: HashMap<String, HashSet<String>> = HashMap::new();
- for job in &self.jobs {
- let start = self.node_index[&job.id];
+ for (start_id, &start) in &self.by_id {
let mut reachable = HashSet::new();
let mut bfs = Bfs::new(reversed, start);
while let Some(idx) = bfs.next(reversed) {
- let visited = &self.jobs[self.graph[idx]];
+ let visited = &self.graph[idx];
if idx != start {
reachable.insert(visited.id.clone());
}
for input in &visited.inputs {
- if !self.node_index.contains_key(input) {
+ if !self.by_id.contains_key(input) {
reachable.insert(input.clone());
}
}
}
- result.insert(job.id.clone(), reachable);
+ result.insert(start_id.clone(), reachable);
}
result
}
@@ -289,31 +291,37 @@ impl Pipeline {
/// full helper machinery (which doesn't exist yet).
#[doc(hidden)]
pub fn replace_first_run_fn(&mut self, run_fn: RunFn) {
- if let Some(job) = self.jobs.first_mut() {
+ if let Some(job) = self.graph.node_weights_mut().next() {
job.run_fn = run_fn;
}
}
}
-/// Build the dependency graph for `jobs`. Inputs that don't match a
-/// known job id are treated as source refs (e.g. `quire/push`) and
-/// don't get an edge — they're not nodes in this graph.
-fn build_graph(jobs: &[Job]) -> (JobGraph, HashMap<String, NodeIndex>) {
+/// Build the dependency graph by consuming `jobs` into graph nodes.
+/// Inputs that don't match a known job id are treated as source refs
+/// (e.g. `quire/push`) and don't get an edge — they're not nodes in
+/// this graph.
+fn build_graph(jobs: Vec<Job>) -> (JobGraph, HashMap<String, NodeIndex>) {
let mut graph = JobGraph::new();
- let mut node_index = HashMap::new();
- for (i, job) in jobs.iter().enumerate() {
- let idx = graph.add_node(i);
- node_index.insert(job.id.clone(), idx);
- }
+ let mut by_id = HashMap::with_capacity(jobs.len());
for job in jobs {
- let dependent = node_index[&job.id];
- for input in &job.inputs {
- if let Some(&dependency) = node_index.get(input) {
+ let id = job.id.clone();
+ let idx = graph.add_node(job);
+ by_id.insert(id, idx);
+ }
+ // `node_indices()` walks insertion order, giving deterministic edge
+ // ordering. Snapshotting to a Vec releases the immutable graph
+ // borrow before we mutate it via `add_edge`.
+ let dependents: Vec<NodeIndex> = graph.node_indices().collect();
+ for dependent in dependents {
+ let inputs = graph[dependent].inputs.clone();
+ for input in inputs {
+ if let Some(&dependency) = by_id.get(&input) {
graph.add_edge(dependency, dependent, ());
}
}
}
- (graph, node_index)
+ (graph, by_id)
}
/// Compute a span covering the given 1-indexed line in `source`.
@@ -398,9 +406,9 @@ pub fn compile(source: &str, name: &str) -> CompileResult<Pipeline> {
let fennel = Fennel::new()?;
let Registrations { jobs, image } = registration::register(&fennel, source, name)?;
- let (graph, node_index) = build_graph(&jobs);
+ let (graph, by_id) = build_graph(jobs);
- if let Err(errors) = validate_post_graph(&jobs, &graph) {
+ if let Err(errors) = validate_post_graph(&graph) {
return Err(PipelineError {
src: NamedSource::new(name, source.to_string()),
diagnostics: errors.into_iter().map(Diagnostic::Structure).collect(),
@@ -409,9 +417,8 @@ pub fn compile(source: &str, name: &str) -> CompileResult<Pipeline> {
}
Ok(Pipeline {
- jobs,
graph,
- node_index,
+ by_id,
fennel,
image,
source: source.to_string(),
@@ -425,10 +432,7 @@ pub fn compile(source: &str, name: &str) -> CompileResult<Pipeline> {
/// Per-job pre-graph rules (slash-in-id, empty inputs) run inside the
/// `(ci.job …)` callback during `registration::register`, so they are
/// not re-checked here.
-fn validate_post_graph(
- jobs: &[Job],
- graph: &JobGraph,
-) -> std::result::Result<(), Vec<StructureError>> {
+fn validate_post_graph(graph: &JobGraph) -> std::result::Result<(), Vec<StructureError>> {
let mut errors = Vec::new();
let mut cycle_members: std::collections::HashSet<&str> = std::collections::HashSet::new();
@@ -440,7 +444,7 @@ fn validate_post_graph(
if !is_cycle {
continue;
}
- let mut members: Vec<&Job> = scc.iter().map(|&idx| &jobs[graph[idx]]).collect();
+ let mut members: Vec<&Job> = scc.iter().map(|&idx| &graph[idx]).collect();
members.sort_by(|a, b| a.id.cmp(&b.id));
for j in &members {
cycle_members.insert(j.id.as_str());
@@ -457,7 +461,7 @@ fn validate_post_graph(
// typos like `:quire/posh` don't silently make a job "reachable."
let is_source = |name: &str| name.starts_with("quire/");
- for job in jobs {
+ for job in graph.node_weights() {
// Cycle members are already reported via Cycle; skip them here so
// a single bad cycle doesn't generate N+1 errors.
if cycle_members.contains(job.id.as_str()) {
@@ -475,7 +479,7 @@ fn validate_post_graph(
found_source = true;
break;
}
- if let Some(upstream) = jobs.iter().find(|j| j.id == name) {
+ if let Some(upstream) = graph.node_weights().find(|j| j.id == name) {
for input in &upstream.inputs {
stack.push(input.as_str());
}
@@ -520,6 +524,7 @@ mod tests {
(ci.job :test [:build] (fn [] nil))
"#;
let pipeline = compile(source, "ci.fnl").expect("compile should succeed");
+ // Topological order: build (depends only on quire/push) before test.
let jobs = pipeline.jobs();
assert_eq!(jobs.len(), 2);
assert_eq!(jobs[0].id, "build");
@@ -537,11 +542,14 @@ mod tests {
(ci.job :sixth [:quire/push] (fn [] nil))";
let pipeline = compile(source, "ci.fnl").expect("compile should succeed");
- let lines: Vec<usize> = pipeline
+ let mut lines: Vec<usize> = pipeline
.jobs()
.iter()
.map(|j| 1 + source[..j.span.offset()].matches('\n').count())
.collect();
+ // All three jobs depend only on quire/push, so topo order
+ // among them isn't fixed — sort before comparing.
+ lines.sort();
assert_eq!(lines, vec![2, 3, 6]);
}
@@ -582,9 +590,9 @@ mod tests {
/// Run post-graph validation against `jobs`, building the dependency
/// graph the same way `compile` does.
- fn validate(jobs: &[Job]) -> std::result::Result<(), Vec<StructureError>> {
+ fn validate(jobs: Vec<Job>) -> std::result::Result<(), Vec<StructureError>> {
let (graph, _) = build_graph(jobs);
- validate_post_graph(jobs, &graph)
+ validate_post_graph(&graph)
}
#[test]
@@ -596,7 +604,7 @@ mod tests {
(ci.job :test [:build :quire/push] (fn [] nil))
"#,
);
- assert!(validate(&jobs).is_ok());
+ assert!(validate(jobs).is_ok());
}
#[test]
@@ -608,7 +616,7 @@ mod tests {
(ci.job :b [:a] (fn [] nil))
"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
assert!(
errs.iter().any(|e| matches!(e, StructureError::Cycle { cycle_jobs, .. } if cycle_jobs.contains(&"a".to_string()) && cycle_jobs.contains(&"b".to_string()))),
"should report a cycle involving a and b: {errs:?}"
@@ -625,7 +633,7 @@ mod tests {
(ci.job :clean [:quire/push] (fn [] nil))
"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
let cycle_errs: Vec<&Vec<String>> = errs
.iter()
.filter_map(|e| match e {
@@ -652,7 +660,7 @@ mod tests {
(ci.job :d [:c :quire/push] (fn [] nil))
"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
let cycle_count = errs
.iter()
.filter(|e| matches!(e, StructureError::Cycle { .. }))
@@ -714,7 +722,7 @@ mod tests {
(ci.job :b [:a] (fn [] nil))
"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
let unreachable_count = errs
.iter()
.filter(|e| matches!(e, StructureError::Unreachable { .. }))
@@ -734,7 +742,7 @@ mod tests {
r#"(local ci (require :quire.ci))
(ci.job :orphan [:does-not-exist] (fn [] nil))"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
assert!(
errs.iter().any(
|e| matches!(e, StructureError::Unreachable { job_id, .. } if job_id == "orphan")
@@ -756,7 +764,7 @@ mod tests {
(ci.job :c [:a] (fn [] nil))
(ci.job :d [:b :c] (fn [] nil))"#,
);
- assert!(validate(&jobs).is_ok());
+ assert!(validate(jobs).is_ok());
}
#[test]
@@ -770,7 +778,7 @@ mod tests {
(local ci (require :quire.ci))
(ci.job :orphan [:a :a] (fn [] nil))"#,
);
- let errs = validate(&jobs).unwrap_err();
+ let errs = validate(jobs).unwrap_err();
assert!(
errs.iter()
.any(|e| matches!(e, StructureError::Unreachable { .. })),
diff --git a/quire-core/src/ci/runtime.rs b/quire-core/src/ci/runtime.rs
index df4c9bb..c72de72 100644
--- a/quire-core/src/ci/runtime.rs
+++ b/quire-core/src/ci/runtime.rs
@@ -190,9 +190,9 @@ impl Runtime {
self.pipeline.fennel().lua()
}
- /// The topo-sorted job IDs in execution order.
- pub fn topo_order(&self) -> Vec<&str> {
- self.pipeline.topo_order()
+ /// Jobs in execution (topological) order.
+ pub fn jobs(&self) -> Vec<&Job> {
+ self.pipeline.jobs()
}
/// Look up a job by id.
@@ -675,7 +675,13 @@ mod tests {
secrets: HashMap<String, SecretString>,
) -> (Rc<Runtime>, mlua::Function, RuntimeHandle) {
let pipeline = compile(source, "ci.fnl").expect("compile should succeed");
- let run_fn = match pipeline.jobs()[0].run_fn.clone() {
+ let run_fn = match pipeline
+ .jobs()
+ .first()
+ .expect("test pipeline has one job")
+ .run_fn
+ .clone()
+ {
RunFn::Lua(f) => f,
RunFn::Rust(_) => panic!("expected RunFn::Lua for test setup"),
};
diff --git a/quire-server/src/bin/quire/commands/ci.rs b/quire-server/src/bin/quire/commands/ci.rs
index bc11355..dbffad0 100644
--- a/quire-server/src/bin/quire/commands/ci.rs
+++ b/quire-server/src/bin/quire/commands/ci.rs
@@ -19,14 +19,13 @@ pub async fn validate(maybe_sha: Option<&str>) -> Result<()> {
return Ok(());
};
- let jobs = pipeline.jobs();
- if jobs.is_empty() {
+ if pipeline.job_count() == 0 {
println!("No jobs registered.");
return Ok(());
}
println!("Jobs:");
- for job in jobs {
+ for job in pipeline.jobs() {
let inputs = job.inputs.join(", ");
println!(" {} ← [{}]", job.id, inputs);
}
diff --git a/quire-server/src/ci/mod.rs b/quire-server/src/ci/mod.rs
index 1c8fbe6..18ad367 100644
--- a/quire-server/src/ci/mod.rs
+++ b/quire-server/src/ci/mod.rs
@@ -361,8 +361,8 @@ mod tests {
.pipeline(&commit)
.expect("pipeline should succeed")
.expect("should have pipeline");
- assert_eq!(pipeline.jobs().len(), 1);
- assert_eq!(pipeline.jobs()[0].id, "build");
+ assert_eq!(pipeline.job_count(), 1);
+ assert!(pipeline.job("build").is_some());
}
#[test]