Replace log.yml with sh_events table and CRI log files
Per-sh output now writes to jobs/<job-id>/sh-N.log in k8s CRI format
and records timing metadata in a new sh_events SQLite table. The jobs
table is populated during execution with per-job start/finish state.
Removes the old log.yml serialization path.

Assisted-by: GLM-5.1 via pi
change wmkzzqnwryylnkounvwsvxypmmxvusws
commit 3dc2b742c35e314754ec10d927c4dc95116ef590
author Alpha Chen <alpha@kejadlen.dev>
date
parent znrxuwqn
diff --git a/migrations/0002_sh_events.sql b/migrations/0002_sh_events.sql
new file mode 100644
index 0000000..f9da4cc
--- /dev/null
+++ b/migrations/0002_sh_events.sql
@@ -0,0 +1,10 @@
+CREATE TABLE sh_events (
+  run_id         TEXT NOT NULL,
+  job_id         TEXT NOT NULL,
+  started_at_ms  INTEGER NOT NULL,
+  finished_at_ms INTEGER NOT NULL,
+  exit_code      INTEGER NOT NULL,
+  cmd            TEXT NOT NULL,
+  PRIMARY KEY (run_id, job_id, started_at_ms),
+  FOREIGN KEY (run_id, job_id) REFERENCES jobs(run_id, job_id) ON DELETE CASCADE
+);
diff --git a/src/ci/logs.rs b/src/ci/logs.rs
new file mode 100644
index 0000000..2c734a1
--- /dev/null
+++ b/src/ci/logs.rs
@@ -0,0 +1,93 @@
+//! Per-sh CRI log files for CI runs.
+//!
+//! Each `(sh ...)` call within a job produces a file at
+//! `jobs/<job-id>/sh-<n>.log` in k8s CRI log format:
+//!
+//! ```text
+//! <RFC3339 ts> <stream> <tag> <content>
+//! ```
+//!
+//! Stream is `stdout` or `stderr`. Tag is `F` (full line).
+
+use std::path::Path;
+
+use super::runtime::ShOutput;
+
+/// Write a sh output to a CRI log file.
+///
+/// Each line of stdout/stderr becomes one CRI-format line with the
+/// given base timestamp, stream tag, and `F` (full) tag.
+pub fn write_cri_log(path: &Path, output: &ShOutput, ts: &str) -> std::io::Result<()> {
+    use std::io::Write;
+
+    let mut f = std::fs::File::create(path)?;
+
+    for line in output.stdout.lines() {
+        writeln!(f, "{ts} stdout F {line}")?;
+    }
+
+    for line in output.stderr.lines() {
+        writeln!(f, "{ts} stderr F {line}")?;
+    }
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn cri_log_splits_stdout_into_lines() {
+        let dir = tempfile::tempdir().expect("tempdir");
+        let path = dir.path().join("sh-1.log");
+        let output = ShOutput {
+            exit: 0,
+            stdout: "line one\nline two\n".to_string(),
+            stderr: String::new(),
+            cmd: "[\"echo\"]".to_string(),
+        };
+
+        write_cri_log(&path, &output, "2026-05-06T12:00:00Z").expect("write");
+
+        let contents = std::fs::read_to_string(&path).expect("read");
+        let lines: Vec<&str> = contents.lines().collect();
+        assert_eq!(lines.len(), 2);
+        assert!(lines[0].contains("stdout F line one"));
+        assert!(lines[1].contains("stdout F line two"));
+    }
+
+    #[test]
+    fn cri_log_handles_stderr() {
+        let dir = tempfile::tempdir().expect("tempdir");
+        let path = dir.path().join("sh-1.log");
+        let output = ShOutput {
+            exit: 1,
+            stdout: String::new(),
+            stderr: "an error\n".to_string(),
+            cmd: "[\"false\"]".to_string(),
+        };
+
+        write_cri_log(&path, &output, "2026-05-06T12:00:00Z").expect("write");
+
+        let contents = std::fs::read_to_string(&path).expect("read");
+        assert!(contents.contains("stderr F an error"));
+    }
+
+    #[test]
+    fn cri_log_handles_empty_output() {
+        let dir = tempfile::tempdir().expect("tempdir");
+        let path = dir.path().join("sh-1.log");
+        let output = ShOutput {
+            exit: 0,
+            stdout: String::new(),
+            stderr: String::new(),
+            cmd: "true".to_string(),
+        };
+
+        write_cri_log(&path, &output, "2026-05-06T12:00:00Z").expect("write");
+
+        let contents = std::fs::read_to_string(&path).expect("read");
+        assert!(contents.is_empty());
+    }
+}
diff --git a/src/ci/mod.rs b/src/ci/mod.rs
index a40e717..6d971a6 100644
--- a/src/ci/mod.rs
+++ b/src/ci/mod.rs
@@ -3,6 +3,7 @@
 use std::collections::HashMap;
 
 pub(crate) mod docker;
+pub(crate) mod logs;
 mod mirror;
 mod pipeline;
 mod registration;
diff --git a/src/ci/run.rs b/src/ci/run.rs
index a61a2ac..e8fd60a 100644
--- a/src/ci/run.rs
+++ b/src/ci/run.rs
@@ -310,6 +310,16 @@ impl Run {
                 .run_fn
                 .clone();
 
+            // Insert job row in 'active' state.
+            let job_started = Timestamp::now().as_millisecond();
+            {
+                let db = crate::db::open(&self.db_path)?;
+                db.execute(
+                    "INSERT INTO jobs (run_id, job_id, state, started_at_ms) VALUES (?1, ?2, 'active', ?3)",
+                    rusqlite::params![&self.id, job_id, job_started],
+                )?;
+            }
+
             runtime.enter_job(job_id);
             let result: Result<()> = (|| match run_fn {
                 RunFn::Lua(f) => {
@@ -320,6 +330,20 @@ impl Run {
             })();
             runtime.leave_job();
 
+            // Update job row to terminal state.
+            let job_finished = Timestamp::now().as_millisecond();
+            let (job_state, exit_code) = match &result {
+                Ok(()) => ("complete", None::<i32>),
+                Err(_) => ("failed", None::<i32>),
+            };
+            {
+                let db = crate::db::open(&self.db_path)?;
+                db.execute(
+                    "UPDATE jobs SET state = ?1, exit_code = ?2, finished_at_ms = ?3 WHERE run_id = ?4 AND job_id = ?5",
+                    rusqlite::params![job_state, exit_code, job_finished, &self.id, job_id],
+                )?;
+            }
+
             if let Err(e) = result {
                 failed_job = Some((job_id.to_string(), e));
                 break;
@@ -329,9 +353,10 @@ impl Run {
         // Always drain outputs and write logs, even on failure — the
         // jobs that did run before the failure are useful context.
         let outputs = runtime.take_outputs();
+        let timings = runtime.take_sh_timings();
         lua.remove_app_data::<Rc<Runtime>>();
 
-        self.write_all_logs(&outputs)?;
+        self.write_sh_records(&outputs, &timings)?;
 
         // Drop the runtime *before* the final transition. In docker
         // mode this fires `DockerLifecycle::drop`, which stamps
@@ -410,20 +435,60 @@ impl Run {
         }
     }
 
-    /// Write per-job log files from the captured `(sh …)` outputs.
-    ///
-    /// Creates `jobs/<job-id>/log.yml` in the run directory for each
-    /// job that has outputs. Written before the final state transition
-    /// so logs are available for both successful and failed runs.
-    fn write_all_logs(&self, outputs: &HashMap<String, Vec<ShOutput>>) -> Result<()> {
+    /// Write sh events to the database and per-sh CRI log files to
+    /// disk. Written before the final state transition so logs are
+    /// available for both successful and failed runs.
+    fn write_sh_records(
+        &self,
+        outputs: &HashMap<String, Vec<ShOutput>>,
+        timings: &HashMap<String, Vec<(usize, jiff::Timestamp, jiff::Timestamp)>>,
+    ) -> Result<()> {
+        if outputs.is_empty() {
+            return Ok(());
+        }
+
+        let db = crate::db::open(&self.db_path)?;
+
         for (job_id, sh_outputs) in outputs {
-            if sh_outputs.is_empty() {
-                continue;
-            }
+            let job_timings = timings.get(job_id);
             let job_dir = self.path().join("jobs").join(job_id);
-            fs_err::create_dir_all(&job_dir)?;
-            write_yaml(&job_dir.join("log.yml"), sh_outputs)?;
+
+            for (i, output) in sh_outputs.iter().enumerate() {
+                let (n, started_at, finished_at) = job_timings
+                    .and_then(|t| t.get(i))
+                    .copied()
+                    .unwrap_or_else(|| {
+                        // Fallback if timing wasn't captured (shouldn't happen).
+                        let n = i + 1;
+                        let now = jiff::Timestamp::now();
+                        (n, now, now)
+                    });
+
+                // Write CRI log file.
+                fs_err::create_dir_all(&job_dir)?;
+                let sh_path = job_dir.join(format!("sh-{n}.log"));
+                super::logs::write_cri_log(
+                    &sh_path,
+                    output,
+                    &started_at.to_string(),
+                )?;
+
+                // Insert sh event into the database.
+                db.execute(
+                    "INSERT INTO sh_events (run_id, job_id, started_at_ms, finished_at_ms, exit_code, cmd)
+                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
+                    rusqlite::params![
+                        &self.id,
+                        job_id,
+                        started_at.as_millisecond(),
+                        finished_at.as_millisecond(),
+                        output.exit,
+                        &output.cmd,
+                    ],
+                )?;
+            }
         }
+
         Ok(())
     }
 
@@ -582,14 +647,6 @@ pub fn materialize_workspace(git_dir: &Path, sha: &str, workspace: &Path) -> Res
     Ok(())
 }
 
-/// Write a serializable value to a YAML file atomically (temp file + rename).
-pub(crate) fn write_yaml<T: serde::Serialize>(path: &Path, value: &T) -> Result<()> {
-    let tmp_path = path.with_extension("yml.tmp");
-    let f = fs_err::File::create(&tmp_path)?;
-    serde_yaml_ng::to_writer(std::io::BufWriter::new(f), value)?;
-    fs_err::rename(&tmp_path, path)?;
-    Ok(())
-}
 
 #[cfg(test)]
 mod tests {
@@ -1255,22 +1312,28 @@ mod tests {
         )
         .expect("execute");
 
+        // CRI log file should exist.
         let log_path = runs
             .base_dir
             .join(&run_id)
             .join("jobs")
             .join("greet")
-            .join("log.yml");
-        assert!(log_path.exists(), "job log file should exist");
-
-        let entries: Vec<ShOutput> =
-            serde_yaml_ng::from_str(&fs_err::read_to_string(&log_path).expect("read log"))
-                .expect("parse log");
-        assert_eq!(entries.len(), 1);
-        assert_eq!(entries[0].exit, 0);
-        assert_eq!(entries[0].stdout, "hello\n");
-        assert!(entries[0].stderr.is_empty());
-        assert_eq!(entries[0].cmd, "[\"echo\", \"hello\"]");
+            .join("sh-1.log");
+        assert!(log_path.exists(), "sh-1.log should exist");
+
+        let contents = fs_err::read_to_string(&log_path).expect("read log");
+        assert!(contents.contains("stdout F hello"));
+
+        // sh_events table should have one row.
+        let db = crate::db::open(&quire.db_path()).expect("db");
+        let count: i64 = db
+            .query_row(
+                "SELECT COUNT(*) FROM sh_events WHERE run_id = ?1 AND job_id = 'greet'",
+                rusqlite::params![&run_id],
+                |row| row.get(0),
+            )
+            .expect("query");
+        assert_eq!(count, 1);
     }
 
     #[test]
@@ -1298,17 +1361,14 @@ mod tests {
         let failed_dir = runs.base_dir.join(&run_id);
         assert!(failed_dir.exists(), "run directory should exist");
 
-        let log_path = failed_dir.join("jobs").join("a").join("log.yml");
+        let log_path = failed_dir.join("jobs").join("a").join("sh-1.log");
         assert!(
             log_path.exists(),
-            "job 'a' log should exist even though 'b' failed"
+            "job 'a' sh-1.log should exist even though 'b' failed"
         );
 
-        let entries: Vec<ShOutput> =
-            serde_yaml_ng::from_str(&fs_err::read_to_string(&log_path).expect("read log"))
-                .expect("parse log");
-        assert_eq!(entries.len(), 1);
-        assert_eq!(entries[0].stdout, "from-a\n");
+        let contents = fs_err::read_to_string(&log_path).expect("read log");
+        assert!(contents.contains("stdout F from-a"));
     }
 
     #[test]
diff --git a/src/ci/runtime.rs b/src/ci/runtime.rs
index dca1282..518de9c 100644
--- a/src/ci/runtime.rs
+++ b/src/ci/runtime.rs
@@ -11,6 +11,7 @@ use std::collections::HashMap;
 use std::rc::Rc;
 
 use mlua::{IntoLua, Lua, LuaSerdeExt};
+use jiff::Timestamp;
 
 use super::pipeline::{Job, Pipeline};
 use super::run::{DockerLifecycle, RunMeta};
@@ -53,6 +54,11 @@ pub(super) struct Runtime {
     pub(super) inputs: HashMap<String, HashMap<String, Option<mlua::Value>>>,
     pub(super) current_job: RefCell<Option<String>>,
     pub(super) outputs: RefCell<HashMap<String, Vec<ShOutput>>>,
+    /// Per-sh timing records: job_id → (sh_index, started_at, finished_at).
+    /// Parallel to `outputs`; each entry at the same index corresponds.
+    pub(super) sh_timings: RefCell<HashMap<String, Vec<(usize, Timestamp, Timestamp)>>>,
+    /// Per-job sh call counter for assigning sequential indices.
+    sh_counter: RefCell<HashMap<String, usize>>,
     /// The materialized workspace for this run. Every `(sh …)` call
     /// runs here.
     workspace: std::path::PathBuf,
@@ -118,6 +124,8 @@ impl Runtime {
             inputs,
             current_job: RefCell::new(None),
             outputs: RefCell::new(HashMap::new()),
+            sh_timings: RefCell::new(HashMap::new()),
+            sh_counter: RefCell::new(HashMap::new()),
             workspace,
             executor,
         }
@@ -178,6 +186,11 @@ impl Runtime {
         std::mem::take(&mut *self.outputs.borrow_mut())
     }
 
+    /// Drain all recorded sh timings, returning them keyed by job id.
+    pub(super) fn take_sh_timings(&self) -> HashMap<String, Vec<(usize, Timestamp, Timestamp)>> {
+        std::mem::take(&mut *self.sh_timings.borrow_mut())
+    }
+
     /// Resolve a declared secret by name. Errors if the name isn't
     /// declared or the secret's source can't be read.
     pub(super) fn secret(&self, name: &str) -> super::error::Result<String> {
@@ -197,6 +210,7 @@ impl Runtime {
     /// container's working directory, and `opts.env` is forwarded as
     /// `-e KEY=VAL` flags.
     pub(super) fn sh(&self, cmd: Cmd, opts: ShOpts) -> super::error::Result<ShOutput> {
+        let started_at = Timestamp::now();
         let output = match self.docker_target() {
             None => cmd.run(opts, &self.workspace)?,
             Some((container_id, work_dir)) => {
@@ -206,12 +220,25 @@ impl Runtime {
                 wrapped.run(ShOpts::default(), &self.workspace)?
             }
         };
+        let finished_at = Timestamp::now();
         if let Some(job) = self.current_job.borrow().as_ref() {
+            let n = {
+                let mut counter = self.sh_counter.borrow_mut();
+                let entry = counter.entry(job.clone()).or_insert(1);
+                let n = *entry;
+                *entry += 1;
+                n
+            };
             self.outputs
                 .borrow_mut()
                 .entry(job.clone())
                 .or_default()
                 .push(output.clone());
+            self.sh_timings
+                .borrow_mut()
+                .entry(job.clone())
+                .or_default()
+                .push((n, started_at, finished_at));
         }
         Ok(output)
     }
@@ -229,6 +256,8 @@ impl Runtime {
             inputs: HashMap::new(),
             current_job: RefCell::new(None),
             outputs: RefCell::new(HashMap::new()),
+            sh_timings: RefCell::new(HashMap::new()),
+            sh_counter: RefCell::new(HashMap::new()),
             workspace: std::env::current_dir().expect("cwd"),
             executor: ExecutorRuntime::Host,
         }
diff --git a/src/db.rs b/src/db.rs
index 39d6f9c..d7f473f 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -11,7 +11,10 @@ use rusqlite_migration::{M, Migrations};
 /// The ordered set of schema migrations. Append-only — never edit
 /// a migration that has already shipped.
 static MIGRATIONS: std::sync::LazyLock<Migrations<'static>> = std::sync::LazyLock::new(|| {
-    Migrations::new(vec![M::up(include_str!("../migrations/0001_initial.sql"))])
+    Migrations::new(vec![
+        M::up(include_str!("../migrations/0001_initial.sql")),
+        M::up(include_str!("../migrations/0002_sh_events.sql")),
+    ])
 });
 
 /// Error from running migrations.