Make :executor :quire-ci populate jobs and sh_events
Subprocess gets `--out-dir <run_dir>` (per-sh logs at the same path the
Host executor produces) and `--events <run_dir>/events.jsonl` (the
structured stream). Server ingests the file post-run in two passes —
jobs first, sh_events second, since the FK requires the parent row.

Event types moved from quire-ci to quire-core so producer and consumer
share the wire schema.

Assisted-by: Claude Opus 4.7 via Claude Code
change rzxlrlmpytoruroxtxswrwwnonsoouqn
commit 4efc449086fcb8fe47de20a09453b75cd7f9535d
author Alpha Chen <alpha@kejadlen.dev>
date
parent qrzvqtzw
diff --git a/quire-ci/src/main.rs b/quire-ci/src/main.rs
index 9f424ce..2045bc4 100644
--- a/quire-ci/src/main.rs
+++ b/quire-ci/src/main.rs
@@ -1,4 +1,3 @@
-mod event;
 mod sink;
 
 use std::cell::RefCell;
@@ -9,11 +8,11 @@ use std::rc::Rc;
 
 use clap::Parser;
 use miette::IntoDiagnostic;
+use quire_core::ci::event::{Event, EventKind, JobOutcome};
 use quire_core::ci::pipeline::{self, Pipeline, RunFn};
 use quire_core::ci::run::RunMeta;
 use quire_core::ci::runtime::{Runtime, RuntimeError, RuntimeEvent, RuntimeHandle};
 
-use crate::event::{Event, EventKind, JobOutcome};
 use crate::sink::{EventSink, JsonlSink, NullSink};
 
 /// Run a quire CI pipeline locally.
diff --git a/quire-ci/src/sink.rs b/quire-ci/src/sink.rs
index 2d9ece4..5306208 100644
--- a/quire-ci/src/sink.rs
+++ b/quire-ci/src/sink.rs
@@ -7,7 +7,7 @@
 
 use std::io::{self, Write};
 
-use crate::event::Event;
+use quire_core::ci::event::Event;
 
 /// A consumer of pipeline events.
 pub trait EventSink {
@@ -54,7 +54,7 @@ impl<W: Write> EventSink for JsonlSink<W> {
 mod tests {
     use super::*;
 
-    use crate::event::EventKind;
+    use quire_core::ci::event::EventKind;
 
     fn sample_started(job_id: &str) -> Event {
         Event {
diff --git a/quire-ci/src/event.rs b/quire-core/src/ci/event.rs
similarity index 100%
rename from quire-ci/src/event.rs
rename to quire-core/src/ci/event.rs
diff --git a/quire-core/src/ci/mod.rs b/quire-core/src/ci/mod.rs
index 2541629..d2dcc9c 100644
--- a/quire-core/src/ci/mod.rs
+++ b/quire-core/src/ci/mod.rs
@@ -5,6 +5,7 @@
 //! (where `quire-ci` invokes them) and on the server (where the
 //! orchestrator drives them).
 
+pub mod event;
 pub mod logs;
 pub mod mirror;
 pub mod pipeline;
diff --git a/quire-server/src/ci/error.rs b/quire-server/src/ci/error.rs
index 9fcfa1f..ce3e8e8 100644
--- a/quire-server/src/ci/error.rs
+++ b/quire-server/src/ci/error.rs
@@ -66,6 +66,13 @@ pub enum Error {
 
     #[error("quire-ci exited with status {exit:?}")]
     QuireCiExit { exit: Option<i32> },
+
+    #[error("failed to parse quire-ci event stream at {path}")]
+    EventStreamParse {
+        path: std::path::PathBuf,
+        #[source]
+        source: serde_json::Error,
+    },
 }
 
 pub type Result<T> = std::result::Result<T, Error>;
diff --git a/quire-server/src/ci/run.rs b/quire-server/src/ci/run.rs
index bc05621..c1e624c 100644
--- a/quire-server/src/ci/run.rs
+++ b/quire-server/src/ci/run.rs
@@ -314,15 +314,23 @@ impl Run {
 
     /// Run the pipeline by shelling out to the `quire-ci` binary.
     ///
-    /// Combined stdout+stderr is captured to `<run_dir>/quire-ci.log`.
-    /// Run finishes `Complete` on exit 0, `Failed` otherwise. Per-job
-    /// and per-sh database records are not written in this path —
-    /// quire-ci doesn't yet emit a structured report for the
-    /// orchestrator to ingest.
+    /// Layout under the run dir on disk:
+    /// * `quire-ci.log` — combined stdout+stderr of the subprocess.
+    /// * `events.jsonl` — structured event stream (one JSON object per
+    ///   line). Ingested into `jobs` and `sh_events` after the
+    ///   subprocess exits.
+    /// * `jobs/<job>/sh-<n>.log` — per-sh CRI logs, written by quire-ci
+    ///   via `--out-dir`. Same layout the Host executor produces.
+    ///
+    /// Run finishes `Complete` on exit 0, `Failed` otherwise. The DB
+    /// rows are written even on failure so the web UI can render
+    /// partial progress.
     pub fn execute_via_quire_ci(mut self, workspace: &Path) -> Result<()> {
         self.transition(RunState::Active)?;
 
-        let log_path = self.path().join("quire-ci.log");
+        let run_dir = self.path();
+        let log_path = run_dir.join("quire-ci.log");
+        let events_path = run_dir.join("events.jsonl");
         // fs_err for the path-bearing IO error; unwrap to std::fs::File so
         // it's convertible into Stdio.
         let log = fs_err::File::create(&log_path)?.into_parts().0;
@@ -331,6 +339,7 @@ impl Run {
         tracing::info!(
             run_id = %self.id,
             log = %log_path.display(),
+            events = %events_path.display(),
             "dispatching run to quire-ci",
         );
 
@@ -338,6 +347,10 @@ impl Run {
             .arg("run")
             .arg("--workspace")
             .arg(workspace)
+            .arg("--out-dir")
+            .arg(&run_dir)
+            .arg("--events")
+            .arg(&events_path)
             .stdout(std::process::Stdio::from(log))
             .stderr(std::process::Stdio::from(log_clone))
             .status()
@@ -347,6 +360,18 @@ impl Run {
                 source,
             })?;
 
+        // Ingest events whether or not the run succeeded — partial
+        // results are still useful in the UI. A failure to read or
+        // parse the file goes to the log but doesn't mask the run's
+        // own pass/fail outcome.
+        if let Err(e) = self.ingest_events(&events_path) {
+            tracing::warn!(
+                run_id = %self.id,
+                error = %e,
+                "failed to ingest quire-ci events; jobs/sh_events rows may be incomplete"
+            );
+        }
+
         if !status.success() {
             self.transition(RunState::Failed)?;
             return Err(Error::QuireCiExit {
@@ -358,6 +383,81 @@ impl Run {
         Ok(())
     }
 
+    /// Read `events.jsonl` and replay it into the database.
+    ///
+    /// Done in two passes because `sh_events` has a foreign key on
+    /// `(run_id, job_id)` in `jobs`, and the wire format interleaves
+    /// sh events with their owning job. Pass 1 inserts every job row
+    /// (paired by `job_id`); pass 2 inserts sh events.
+    fn ingest_events(&self, path: &Path) -> Result<()> {
+        use quire_core::ci::event::{Event, EventKind, JobOutcome};
+
+        let bytes = match fs_err::read(path) {
+            Ok(b) => b,
+            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
+            Err(e) => return Err(e.into()),
+        };
+        let events: Vec<Event> = bytes
+            .split(|b| *b == b'\n')
+            .filter(|line| !line.is_empty())
+            .map(serde_json::from_slice)
+            .collect::<std::result::Result<_, _>>()
+            .map_err(|e| Error::EventStreamParse {
+                path: path.to_path_buf(),
+                source: e,
+            })?;
+
+        let db = crate::db::open(&self.db_path)?;
+
+        // Pass 1: jobs rows. Pair JobStarted with JobFinished by job_id.
+        let mut pending_jobs: HashMap<&str, i64> = HashMap::new();
+        for event in &events {
+            match &event.kind {
+                EventKind::JobStarted { job_id } => {
+                    pending_jobs.insert(job_id.as_str(), event.at_ms);
+                }
+                EventKind::JobFinished { job_id, outcome } => {
+                    let started_at = pending_jobs.remove(job_id.as_str()).unwrap_or(event.at_ms);
+                    let state = match outcome {
+                        JobOutcome::Complete => "complete",
+                        JobOutcome::Failed => "failed",
+                    };
+                    db.execute(
+                        "INSERT INTO jobs (run_id, job_id, state, started_at_ms, finished_at_ms) \
+                         VALUES (?1, ?2, ?3, ?4, ?5)",
+                        rusqlite::params![&self.id, job_id, state, started_at, event.at_ms],
+                    )?;
+                }
+                EventKind::ShStarted { .. } | EventKind::ShFinished { .. } => {}
+            }
+        }
+
+        // Pass 2: sh_events rows. Pair ShStarted with ShFinished by job_id
+        // (sequential within a run-fn, so a single buffer slot per job
+        // is enough).
+        let mut pending_sh: HashMap<&str, (i64, &str)> = HashMap::new();
+        for event in &events {
+            match &event.kind {
+                EventKind::ShStarted { job_id, cmd } => {
+                    pending_sh.insert(job_id.as_str(), (event.at_ms, cmd.as_str()));
+                }
+                EventKind::ShFinished { job_id, exit_code } => {
+                    let Some((started_at, cmd)) = pending_sh.remove(job_id.as_str()) else {
+                        continue;
+                    };
+                    db.execute(
+                        "INSERT INTO sh_events (run_id, job_id, started_at_ms, finished_at_ms, exit_code, cmd) \
+                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
+                        rusqlite::params![&self.id, job_id, started_at, event.at_ms, exit_code, cmd],
+                    )?;
+                }
+                EventKind::JobStarted { .. } | EventKind::JobFinished { .. } => {}
+            }
+        }
+
+        Ok(())
+    }
+
     /// Insert sh_events DB rows from the runtime's captured outputs and
     /// timings. Written before the final state transition so events are
     /// available for both successful and failed runs.
@@ -1344,4 +1444,132 @@ mod tests {
             "expected source to surface rust error, got: {source}"
         );
     }
+
+    #[test]
+    fn ingest_events_writes_jobs_and_sh_events_rows() {
+        use quire_core::ci::event::{Event, EventKind, JobOutcome};
+
+        let (_dir, quire) = tmp_quire();
+        let runs = test_runs(&quire);
+        let run = runs.create(&test_meta()).expect("create");
+        let run_id = run.id().to_string();
+
+        let events = vec![
+            Event {
+                at_ms: 100,
+                kind: EventKind::JobStarted {
+                    job_id: "build".into(),
+                },
+            },
+            Event {
+                at_ms: 110,
+                kind: EventKind::ShStarted {
+                    job_id: "build".into(),
+                    cmd: "echo hi".into(),
+                },
+            },
+            Event {
+                at_ms: 190,
+                kind: EventKind::ShFinished {
+                    job_id: "build".into(),
+                    exit_code: 0,
+                },
+            },
+            Event {
+                at_ms: 200,
+                kind: EventKind::JobFinished {
+                    job_id: "build".into(),
+                    outcome: JobOutcome::Complete,
+                },
+            },
+            Event {
+                at_ms: 210,
+                kind: EventKind::JobStarted {
+                    job_id: "test".into(),
+                },
+            },
+            Event {
+                at_ms: 220,
+                kind: EventKind::JobFinished {
+                    job_id: "test".into(),
+                    outcome: JobOutcome::Failed,
+                },
+            },
+        ];
+
+        let events_path = run.path().join("events.jsonl");
+        let mut bytes = Vec::new();
+        for ev in &events {
+            bytes.extend(serde_json::to_vec(ev).unwrap());
+            bytes.push(b'\n');
+        }
+        fs_err::write(&events_path, bytes).expect("write events.jsonl");
+
+        run.ingest_events(&events_path).expect("ingest");
+
+        let db = crate::db::open(&quire.db_path()).expect("open db");
+        let jobs: Vec<(String, String, i64, i64)> = db
+            .prepare(
+                "SELECT job_id, state, started_at_ms, finished_at_ms FROM jobs \
+                 WHERE run_id = ?1 ORDER BY started_at_ms",
+            )
+            .unwrap()
+            .query_map([&run_id], |row| {
+                Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
+            })
+            .unwrap()
+            .collect::<std::result::Result<_, _>>()
+            .unwrap();
+        assert_eq!(
+            jobs,
+            vec![
+                ("build".to_string(), "complete".to_string(), 100, 200),
+                ("test".to_string(), "failed".to_string(), 210, 220),
+            ]
+        );
+
+        let sh_events: Vec<(String, i64, i64, i32, String)> = db
+            .prepare(
+                "SELECT job_id, started_at_ms, finished_at_ms, exit_code, cmd FROM sh_events \
+                 WHERE run_id = ?1 ORDER BY started_at_ms",
+            )
+            .unwrap()
+            .query_map([&run_id], |row| {
+                Ok((
+                    row.get(0)?,
+                    row.get(1)?,
+                    row.get(2)?,
+                    row.get(3)?,
+                    row.get(4)?,
+                ))
+            })
+            .unwrap()
+            .collect::<std::result::Result<_, _>>()
+            .unwrap();
+        assert_eq!(
+            sh_events,
+            vec![("build".to_string(), 110, 190, 0, "echo hi".to_string())]
+        );
+    }
+
+    #[test]
+    fn ingest_events_treats_missing_file_as_empty() {
+        let (_dir, quire) = tmp_quire();
+        let runs = test_runs(&quire);
+        let run = runs.create(&test_meta()).expect("create");
+
+        let missing = run.path().join("events.jsonl");
+        run.ingest_events(&missing)
+            .expect("missing file should not error");
+
+        let db = crate::db::open(&quire.db_path()).expect("open db");
+        let count: i64 = db
+            .query_row(
+                "SELECT COUNT(*) FROM jobs WHERE run_id = ?1",
+                [run.id()],
+                |r| r.get(0),
+            )
+            .unwrap();
+        assert_eq!(count, 0);
+    }
 }