Make quire-ci's event stream the persistent run record
Each event is `{ at_ms, kind }` — envelope holds the producer-side
timestamp; `EventKind` is a Started/Finished pair for both jobs and
sh calls. Consumers pair them by `job_id` (plus per-job sh sequence)
to assemble `jobs` / `sh_events` rows. `--events <path>` writes the
JSONL to a file alongside the existing `null`/`stdout` targets, so
the orchestrator can ingest the stream post-run when dispatching via
`:executor :quire-ci` instead of needing a parallel report mechanism.
Assisted-by: Claude Opus 4.7 via Claude Code
diff --git a/quire-ci/src/event.rs b/quire-ci/src/event.rs
index f20ee4c..847ffb7 100644
--- a/quire-ci/src/event.rs
+++ b/quire-ci/src/event.rs
@@ -1,20 +1,45 @@
//! Wire-format events emitted by `quire-ci` during a pipeline run.
//!
-//! Events are serialized as one JSON object per line (JSONL), tagged
-//! by `type`. The stream interleaves `job_started`, `sh_started`,
-//! `sh_finished`, and `job_completed` / `job_failed` events in
-//! execution order.
+//! Each event is one JSON object on its own line (JSONL). The
+//! envelope holds the producer-side timestamp; [`EventKind`] holds
+//! the variant-specific payload. `#[serde(flatten)]` keeps the wire
+//! format flat: `{"at_ms": 110, "type": "sh_started", "job_id": …}`.
+//!
+//! Consumers that need durations pair `*Started` with the matching
+//! `*Finished` by `job_id` (plus per-job sh sequence) and read both
+//! events' `at_ms` fields.
use serde::{Deserialize, Serialize};
+/// Terminal state of a job in the event stream.
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(rename_all = "snake_case")]
+pub enum JobOutcome {
+ Complete,
+ Failed,
+}
+
/// A single event in the run's structured output stream.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub struct Event {
+ /// Producer-side wall-clock millisecond timestamp.
+ pub at_ms: i64,
+ #[serde(flatten)]
+ pub kind: EventKind,
+}
+
+/// The variant-specific payload of an [`Event`].
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
-pub enum Event {
+pub enum EventKind {
+ /// A job's run-fn is about to fire.
JobStarted { job_id: String },
- JobCompleted { job_id: String },
- JobFailed { job_id: String },
+ /// A job's run-fn returned. `outcome` is `complete` if the run-fn
+ /// returned `Ok`, else `failed`.
+ JobFinished { job_id: String, outcome: JobOutcome },
+ /// An sh process is about to spawn.
ShStarted { job_id: String, cmd: String },
+ /// An sh process exited.
ShFinished { job_id: String, exit_code: i32 },
}
@@ -23,55 +48,92 @@ mod tests {
use super::*;
#[test]
- fn job_started_event_serializes_in_expected_shape() {
- let event = Event::JobStarted {
- job_id: "build".into(),
+ fn job_started_serializes_in_expected_shape() {
+ let event = Event {
+ at_ms: 100,
+ kind: EventKind::JobStarted {
+ job_id: "build".into(),
+ },
};
let json = serde_json::to_string(&event).unwrap();
- assert_eq!(json, r#"{"type":"job_started","job_id":"build"}"#);
+ assert_eq!(
+ json,
+ r#"{"at_ms":100,"type":"job_started","job_id":"build"}"#
+ );
}
#[test]
- fn job_completed_event_serializes_in_expected_shape() {
- let event = Event::JobCompleted {
- job_id: "build".into(),
+ fn job_finished_serializes_in_expected_shape() {
+ let event = Event {
+ at_ms: 250,
+ kind: EventKind::JobFinished {
+ job_id: "build".into(),
+ outcome: JobOutcome::Complete,
+ },
};
let json = serde_json::to_string(&event).unwrap();
- assert_eq!(json, r#"{"type":"job_completed","job_id":"build"}"#);
+ assert_eq!(
+ json,
+ r#"{"at_ms":250,"type":"job_finished","job_id":"build","outcome":"complete"}"#
+ );
}
#[test]
- fn job_failed_event_serializes_in_expected_shape() {
- let event = Event::JobFailed {
- job_id: "build".into(),
+ fn job_finished_failed_outcome_serializes_as_failed() {
+ let event = Event {
+ at_ms: 250,
+ kind: EventKind::JobFinished {
+ job_id: "build".into(),
+ outcome: JobOutcome::Failed,
+ },
};
let json = serde_json::to_string(&event).unwrap();
- assert_eq!(json, r#"{"type":"job_failed","job_id":"build"}"#);
+ assert!(json.contains(r#""outcome":"failed""#));
}
#[test]
- fn sh_started_event_serializes_in_expected_shape() {
- let event = Event::ShStarted {
- job_id: "build".into(),
- cmd: "echo hi".into(),
+ fn sh_started_serializes_in_expected_shape() {
+ let event = Event {
+ at_ms: 110,
+ kind: EventKind::ShStarted {
+ job_id: "build".into(),
+ cmd: "echo hi".into(),
+ },
};
let json = serde_json::to_string(&event).unwrap();
assert_eq!(
json,
- r#"{"type":"sh_started","job_id":"build","cmd":"echo hi"}"#
+ r#"{"at_ms":110,"type":"sh_started","job_id":"build","cmd":"echo hi"}"#
);
}
#[test]
- fn sh_finished_event_serializes_in_expected_shape() {
- let event = Event::ShFinished {
- job_id: "build".into(),
- exit_code: 0,
+ fn sh_finished_serializes_in_expected_shape() {
+ let event = Event {
+ at_ms: 190,
+ kind: EventKind::ShFinished {
+ job_id: "build".into(),
+ exit_code: 0,
+ },
};
let json = serde_json::to_string(&event).unwrap();
assert_eq!(
json,
- r#"{"type":"sh_finished","job_id":"build","exit_code":0}"#
+ r#"{"at_ms":190,"type":"sh_finished","job_id":"build","exit_code":0}"#
);
}
+
+ #[test]
+ fn event_round_trips_through_json() {
+ let event = Event {
+ at_ms: 110,
+ kind: EventKind::ShStarted {
+ job_id: "build".into(),
+ cmd: "echo hi".into(),
+ },
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ let decoded: Event = serde_json::from_str(&json).unwrap();
+ assert_eq!(decoded, event);
+ }
}
diff --git a/quire-ci/src/main.rs b/quire-ci/src/main.rs
index d6a7827..9f424ce 100644
--- a/quire-ci/src/main.rs
+++ b/quire-ci/src/main.rs
@@ -13,7 +13,7 @@ use quire_core::ci::pipeline::{self, Pipeline, RunFn};
use quire_core::ci::run::RunMeta;
use quire_core::ci::runtime::{Runtime, RuntimeError, RuntimeEvent, RuntimeHandle};
-use crate::event::Event;
+use crate::event::{Event, EventKind, JobOutcome};
use crate::sink::{EventSink, JsonlSink, NullSink};
/// Run a quire CI pipeline locally.
@@ -41,9 +41,14 @@ enum Commands {
/// runtime doesn't yet propagate run-fn outputs into downstream
/// jobs' input views).
Run {
- /// Where to send structured run events.
- #[arg(long, value_enum, default_value_t = EventsKind::Null)]
- events: EventsKind,
+ /// Where to send the structured event stream. Accepts:
+ /// `null` — drop events (default).
+ /// `stdout` — write JSONL to stdout.
+ /// `<path>` — write JSONL to this file. The orchestrator
+ /// reads the file post-run to populate `jobs`
+ /// and `sh_events` database rows.
+ #[arg(long, default_value = "null", value_parser = parse_events_target)]
+ events: EventsTarget,
/// Directory for per-sh CRI log files. Defaults to a fresh
/// tempdir whose path is printed on stdout at the end of the
@@ -108,13 +113,21 @@ impl Drop for DumpLogsOnDrop {
}
}
-#[derive(Clone, Copy, Debug, PartialEq, Eq, clap::ValueEnum)]
-#[value(rename_all = "lowercase")]
-enum EventsKind {
- /// Drop events.
+/// Where the event stream is written. Resolved into a concrete
+/// [`EventSink`] at run time.
+#[derive(Clone, Debug)]
+enum EventsTarget {
Null,
- /// JSONL on stdout, one event per line.
Stdout,
+ File(PathBuf),
+}
+
+fn parse_events_target(s: &str) -> Result<EventsTarget, String> {
+ match s {
+ "null" => Ok(EventsTarget::Null),
+ "stdout" => Ok(EventsTarget::Stdout),
+ path => Ok(EventsTarget::File(PathBuf::from(path))),
+ }
}
fn main() -> miette::Result<()> {
@@ -124,8 +137,12 @@ fn main() -> miette::Result<()> {
Commands::Validate => validate(cli.workspace),
Commands::Run { events, out_dir } => {
let sink: Box<dyn EventSink> = match events {
- EventsKind::Null => Box::new(NullSink),
- EventsKind::Stdout => Box::new(JsonlSink::new(io::stdout())),
+ EventsTarget::Null => Box::new(NullSink),
+ EventsTarget::Stdout => Box::new(JsonlSink::new(io::stdout())),
+ EventsTarget::File(path) => {
+ let file = fs_err::File::create(&path).into_diagnostic()?;
+ Box::new(JsonlSink::new(io::BufWriter::new(file.into_parts().0)))
+ }
};
let (log_dir, _dump) = match out_dir {
Some(path) => {
@@ -202,8 +219,10 @@ fn run_pipeline(
log_dir,
));
- // Active job pointer, shared between the main loop and the runtime
- // callback (which translates RuntimeEvent → wire Event).
+ // Active job pointer, shared between the main loop and the
+ // runtime callback. The callback translates RuntimeEvent into
+ // wire events; consumers pair ShStarted/ShFinished by job_id +
+ // sequence to assemble a per-sh DB row.
let current_job: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
{
@@ -214,17 +233,21 @@ fn run_pipeline(
.borrow()
.clone()
.expect("runtime fires sh events only inside enter_job/leave_job");
- let wire_event = match event {
- RuntimeEvent::ShStarted { cmd } => Event::ShStarted {
+ let kind = match event {
+ RuntimeEvent::ShStarted { cmd } => EventKind::ShStarted {
job_id,
cmd: cmd.to_string(),
},
- RuntimeEvent::ShFinished { exit } => Event::ShFinished {
+ RuntimeEvent::ShFinished { exit } => EventKind::ShFinished {
job_id,
exit_code: exit,
},
};
- cb_sink.borrow_mut().emit(wire_event).expect("emit event");
+ let wire = Event {
+ at_ms: jiff::Timestamp::now().as_millisecond(),
+ kind,
+ };
+ cb_sink.borrow_mut().emit(wire).expect("emit sh event");
}));
}
@@ -239,8 +262,11 @@ fn run_pipeline(
*current_job.borrow_mut() = Some(job_id.clone());
sink.borrow_mut()
- .emit(Event::JobStarted {
- job_id: job_id.clone(),
+ .emit(Event {
+ at_ms: jiff::Timestamp::now().as_millisecond(),
+ kind: EventKind::JobStarted {
+ job_id: job_id.clone(),
+ },
})
.expect("emit job_started");
@@ -260,18 +286,20 @@ fn run_pipeline(
};
runtime.leave_job();
- let finish_event = if result.is_ok() {
- Event::JobCompleted {
- job_id: job_id.clone(),
- }
+ let outcome = if result.is_ok() {
+ JobOutcome::Complete
} else {
- Event::JobFailed {
- job_id: job_id.clone(),
- }
+ JobOutcome::Failed
};
sink.borrow_mut()
- .emit(finish_event)
- .expect("emit job finish");
+ .emit(Event {
+ at_ms: jiff::Timestamp::now().as_millisecond(),
+ kind: EventKind::JobFinished {
+ job_id: job_id.clone(),
+ outcome,
+ },
+ })
+ .expect("emit job_finished");
*current_job.borrow_mut() = None;
@@ -296,3 +324,24 @@ fn compile_at(workspace: &std::path::Path) -> miette::Result<Pipeline> {
let source = fs_err::read_to_string(&path).into_diagnostic()?;
Ok(pipeline::compile(&source, &path.display().to_string())?)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn parse_events_target_classifies_input() {
+ assert!(matches!(
+ parse_events_target("null"),
+ Ok(EventsTarget::Null)
+ ));
+ assert!(matches!(
+ parse_events_target("stdout"),
+ Ok(EventsTarget::Stdout)
+ ));
+ let Ok(EventsTarget::File(path)) = parse_events_target("/tmp/run.jsonl") else {
+ panic!("expected File target");
+ };
+ assert_eq!(path, PathBuf::from("/tmp/run.jsonl"));
+ }
+}
diff --git a/quire-ci/src/sink.rs b/quire-ci/src/sink.rs
index d4e0e18..2d9ece4 100644
--- a/quire-ci/src/sink.rs
+++ b/quire-ci/src/sink.rs
@@ -54,28 +54,41 @@ impl<W: Write> EventSink for JsonlSink<W> {
mod tests {
use super::*;
+ use crate::event::EventKind;
+
+ fn sample_started(job_id: &str) -> Event {
+ Event {
+ at_ms: 1,
+ kind: EventKind::JobStarted {
+ job_id: job_id.into(),
+ },
+ }
+ }
+
#[test]
fn null_sink_accepts_events() {
let mut sink = NullSink;
- sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
+ sink.emit(sample_started("a")).unwrap();
}
#[test]
fn jsonl_sink_writes_one_line_per_event() {
let mut sink = JsonlSink::new(Vec::<u8>::new());
- sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
- sink.emit(Event::ShStarted {
- job_id: "a".into(),
- cmd: "echo".into(),
+ sink.emit(Event {
+ at_ms: 10,
+ kind: EventKind::ShStarted {
+ job_id: "a".into(),
+ cmd: "echo".into(),
+ },
})
.unwrap();
+ sink.emit(sample_started("a")).unwrap();
let output = sink.into_inner();
let s = std::str::from_utf8(&output).unwrap();
- assert_eq!(
- s,
- "{\"type\":\"job_started\",\"job_id\":\"a\"}\n\
- {\"type\":\"sh_started\",\"job_id\":\"a\",\"cmd\":\"echo\"}\n"
- );
+ let lines: Vec<&str> = s.lines().collect();
+ assert_eq!(lines.len(), 2);
+ assert!(lines[0].contains(r#""type":"sh_started""#));
+ assert!(lines[1].contains(r#""type":"job_started""#));
}
#[test]
@@ -99,8 +112,8 @@ mod tests {
buf: Vec::new(),
flushes: 0,
});
- sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
- sink.emit(Event::JobStarted { job_id: "b".into() }).unwrap();
+ sink.emit(sample_started("a")).unwrap();
+ sink.emit(sample_started("b")).unwrap();
let inner = sink.into_inner();
assert_eq!(inner.flushes, 2);
}