Add structured event stream to quire-ci
Wires the runtime event callback through to a wire-format Event
emitted to a configurable sink. Adds a JSONL stdout sink (--events
stdout) so a local run can be piped into a parser, alongside the
default null sink that drops events.
The wire format is a tagged union — JobStarted, JobCompleted,
JobFailed, ShStarted, ShFinished — carrying only data the consumer
can't derive (job_id, cmd on ShStarted, exit_code on ShFinished).
Timestamps and sh indices are deliberately absent: live consumers
can stamp arrival time and count themselves.
run_pipeline now takes an EventSink as a parameter; main constructs
the sink based on the --events flag. Drops the previous
print_outputs human summary — quire-ci run is now silent without
--events stdout, matching the structured-only model.
Assisted-by: claude-opus-4-7
diff --git a/Cargo.lock b/Cargo.lock
index 7849043..cbe6437 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2133,6 +2133,8 @@ dependencies = [
"miette",
"mlua",
"quire-core",
+ "serde",
+ "serde_json",
]
[[package]]
diff --git a/quire-ci/Cargo.toml b/quire-ci/Cargo.toml
index f2678a7..3d52c67 100644
--- a/quire-ci/Cargo.toml
+++ b/quire-ci/Cargo.toml
@@ -10,3 +10,5 @@ jiff = { workspace = true }
miette = { workspace = true, features = ["fancy"] }
mlua = { workspace = true }
quire-core = { path = "../quire-core" }
+serde = { workspace = true }
+serde_json = { workspace = true }
diff --git a/quire-ci/src/event.rs b/quire-ci/src/event.rs
new file mode 100644
index 0000000..f20ee4c
--- /dev/null
+++ b/quire-ci/src/event.rs
@@ -0,0 +1,77 @@
+//! Wire-format events emitted by `quire-ci` during a pipeline run.
+//!
+//! Events are serialized as one JSON object per line (JSONL), tagged
+//! by `type`. The stream interleaves `job_started`, `sh_started`,
+//! `sh_finished`, and `job_completed` / `job_failed` events in
+//! execution order.
+
+use serde::{Deserialize, Serialize};
+
+/// A single event in the run's structured output stream.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Event {
+ JobStarted { job_id: String },
+ JobCompleted { job_id: String },
+ JobFailed { job_id: String },
+ ShStarted { job_id: String, cmd: String },
+ ShFinished { job_id: String, exit_code: i32 },
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn job_started_event_serializes_in_expected_shape() {
+ let event = Event::JobStarted {
+ job_id: "build".into(),
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ assert_eq!(json, r#"{"type":"job_started","job_id":"build"}"#);
+ }
+
+ #[test]
+ fn job_completed_event_serializes_in_expected_shape() {
+ let event = Event::JobCompleted {
+ job_id: "build".into(),
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ assert_eq!(json, r#"{"type":"job_completed","job_id":"build"}"#);
+ }
+
+ #[test]
+ fn job_failed_event_serializes_in_expected_shape() {
+ let event = Event::JobFailed {
+ job_id: "build".into(),
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ assert_eq!(json, r#"{"type":"job_failed","job_id":"build"}"#);
+ }
+
+ #[test]
+ fn sh_started_event_serializes_in_expected_shape() {
+ let event = Event::ShStarted {
+ job_id: "build".into(),
+ cmd: "echo hi".into(),
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ assert_eq!(
+ json,
+ r#"{"type":"sh_started","job_id":"build","cmd":"echo hi"}"#
+ );
+ }
+
+ #[test]
+ fn sh_finished_event_serializes_in_expected_shape() {
+ let event = Event::ShFinished {
+ job_id: "build".into(),
+ exit_code: 0,
+ };
+ let json = serde_json::to_string(&event).unwrap();
+ assert_eq!(
+ json,
+ r#"{"type":"sh_finished","job_id":"build","exit_code":0}"#
+ );
+ }
+}
diff --git a/quire-ci/src/main.rs b/quire-ci/src/main.rs
index 9654f5b..994d4bf 100644
--- a/quire-ci/src/main.rs
+++ b/quire-ci/src/main.rs
@@ -1,4 +1,9 @@
+mod event;
+mod sink;
+
+use std::cell::RefCell;
use std::collections::HashMap;
+use std::io;
use std::path::PathBuf;
use std::rc::Rc;
@@ -7,7 +12,10 @@ use miette::IntoDiagnostic;
use mlua::IntoLua;
use quire_core::ci::pipeline::{self, Pipeline, RunFn};
use quire_core::ci::run::RunMeta;
-use quire_core::ci::runtime::{Runtime, RuntimeError, RuntimeHandle, ShOutput};
+use quire_core::ci::runtime::{Runtime, RuntimeError, RuntimeEvent, RuntimeHandle};
+
+use crate::event::Event;
+use crate::sink::{EventSink, JsonlSink, NullSink};
/// Run a quire CI pipeline locally.
#[derive(Parser)]
@@ -33,7 +41,20 @@ enum Commands {
/// reads return Nil for everything except `quire/push` (the
/// runtime doesn't yet propagate run-fn outputs into downstream
/// jobs' input views).
- Run,
+ Run {
+ /// Where to send structured run events.
+ #[arg(long, value_enum, default_value_t = EventsKind::Null)]
+ events: EventsKind,
+ },
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq, clap::ValueEnum)]
+#[value(rename_all = "lowercase")]
+enum EventsKind {
+ /// Drop events.
+ Null,
+ /// JSONL on stdout, one event per line.
+ Stdout,
}
fn main() -> miette::Result<()> {
@@ -41,7 +62,13 @@ fn main() -> miette::Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Validate => validate(cli.workspace),
- Commands::Run => run_pipeline(cli.workspace),
+ Commands::Run { events } => {
+ let sink: Box<dyn EventSink> = match events {
+ EventsKind::Null => Box::new(NullSink),
+ EventsKind::Stdout => Box::new(JsonlSink::new(io::stdout())),
+ };
+ run_pipeline(cli.workspace, sink)
+ }
}
}
@@ -70,7 +97,7 @@ fn validate(workspace: PathBuf) -> miette::Result<()> {
Ok(())
}
-fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
+fn run_pipeline(workspace: PathBuf, sink: Box<dyn EventSink>) -> miette::Result<()> {
let pipeline = compile_at(&workspace)?;
let job_ids: Vec<String> = pipeline
@@ -79,10 +106,11 @@ fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
.map(|s| s.to_string())
.collect();
if job_ids.is_empty() {
- println!("No jobs registered.");
return Ok(());
}
+ let sink: Rc<RefCell<Box<dyn EventSink>>> = Rc::new(RefCell::new(sink));
+
let meta = RunMeta {
sha: "0".repeat(40),
r#ref: "HEAD".to_string(),
@@ -98,6 +126,32 @@ fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
workspace,
));
+ // Active job pointer, shared between the main loop and the runtime
+ // callback (which translates RuntimeEvent → wire Event).
+ let current_job: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
+
+ {
+ let cb_sink = sink.clone();
+ let cb_current_job = current_job.clone();
+ runtime.set_event_callback(Box::new(move |event| {
+ let job_id = cb_current_job
+ .borrow()
+ .clone()
+ .expect("runtime fires sh events only inside enter_job/leave_job");
+ let wire_event = match event {
+ RuntimeEvent::ShStarted { cmd } => Event::ShStarted {
+ job_id,
+ cmd: cmd.to_string(),
+ },
+ RuntimeEvent::ShFinished { exit } => Event::ShFinished {
+ job_id,
+ exit_code: exit,
+ },
+ };
+ cb_sink.borrow_mut().emit(wire_event).expect("emit event");
+ }));
+ }
+
// Install the runtime handle on the Lua VM once for the whole run;
// each job's run-fn receives `rt_value` as its sole argument.
let lua = runtime.lua();
@@ -107,6 +161,14 @@ fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
let mut failed_job: Option<(String, RuntimeError)> = None;
for job_id in &job_ids {
+ *current_job.borrow_mut() = Some(job_id.clone());
+
+ sink.borrow_mut()
+ .emit(Event::JobStarted {
+ job_id: job_id.clone(),
+ })
+ .expect("emit job_started");
+
let run_fn = runtime
.job(job_id)
.expect("topo_order returns valid ids")
@@ -123,6 +185,21 @@ fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
};
runtime.leave_job();
+ let finish_event = if result.is_ok() {
+ Event::JobCompleted {
+ job_id: job_id.clone(),
+ }
+ } else {
+ Event::JobFailed {
+ job_id: job_id.clone(),
+ }
+ };
+ sink.borrow_mut()
+ .emit(finish_event)
+ .expect("emit job finish");
+
+ *current_job.borrow_mut() = None;
+
if let Err(e) = result {
failed_job = Some((job_id.clone(), e));
break;
@@ -131,32 +208,10 @@ fn run_pipeline(workspace: PathBuf) -> miette::Result<()> {
lua.remove_app_data::<Rc<Runtime>>();
- let outputs = runtime.take_outputs();
- print_outputs(&job_ids, &outputs);
-
- if let Some((job, err)) = failed_job {
- eprintln!("\nJob '{job}' failed.");
+ if let Some((_, err)) = failed_job {
return Err(err.into());
}
- let nonzero: Vec<&str> = job_ids
- .iter()
- .filter(|id| {
- outputs
- .get(id.as_str())
- .is_some_and(|os| os.iter().any(|o| o.exit != 0))
- })
- .map(String::as_str)
- .collect();
- if nonzero.is_empty() {
- println!("\nAll jobs passed.");
- } else {
- println!(
- "\n{} job(s) had non-zero `(sh ...)` exits: {}",
- nonzero.len(),
- nonzero.join(", ")
- );
- }
Ok(())
}
@@ -166,28 +221,3 @@ fn compile_at(workspace: &std::path::Path) -> miette::Result<Pipeline> {
let source = fs_err::read_to_string(&path).into_diagnostic()?;
Ok(pipeline::compile(&source, &path.display().to_string())?)
}
-
-/// Print captured `(sh …)` outputs, grouped by job, in execution
-/// order. Skips jobs with no recorded output.
-fn print_outputs(job_ids: &[String], outputs: &HashMap<String, Vec<ShOutput>>) {
- for job_id in job_ids {
- let Some(job_outputs) = outputs.get(job_id) else {
- continue;
- };
- if job_outputs.is_empty() {
- continue;
- }
- println!("==> {job_id}");
- for o in job_outputs {
- if !o.stdout.is_empty() {
- print!("{}", o.stdout);
- }
- if !o.stderr.is_empty() {
- eprint!("{}", o.stderr);
- }
- if o.exit != 0 {
- eprintln!("(exit {})", o.exit);
- }
- }
- }
-}
diff --git a/quire-ci/src/sink.rs b/quire-ci/src/sink.rs
new file mode 100644
index 0000000..d4e0e18
--- /dev/null
+++ b/quire-ci/src/sink.rs
@@ -0,0 +1,107 @@
+//! Event sinks consume the structured stream emitted during a run.
+//!
+//! [`NullSink`] drops events on the floor and is the default for
+//! standalone `quire-ci run`. [`JsonlSink`] writes one JSON object per
+//! line to any [`Write`], flushing per event so a crash mid-run leaves
+//! a usable artifact.
+
+use std::io::{self, Write};
+
+use crate::event::Event;
+
+/// A consumer of pipeline events.
+pub trait EventSink {
+ fn emit(&mut self, event: Event) -> io::Result<()>;
+}
+
+/// Drops every event. Default for `quire-ci run` standalone use.
+pub struct NullSink;
+
+impl EventSink for NullSink {
+ fn emit(&mut self, _event: Event) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+/// Writes events as JSON Lines (one object per line) to a writer,
+/// flushing after each event. Generic over the writer so production
+/// code uses [`io::Stdout`] and tests use a `Vec<u8>`.
+pub struct JsonlSink<W: Write> {
+ writer: W,
+}
+
+impl<W: Write> JsonlSink<W> {
+ pub fn new(writer: W) -> Self {
+ Self { writer }
+ }
+
+ /// Recover the underlying writer (used by tests to inspect output).
+ #[cfg(test)]
+ pub fn into_inner(self) -> W {
+ self.writer
+ }
+}
+
+impl<W: Write> EventSink for JsonlSink<W> {
+ fn emit(&mut self, event: Event) -> io::Result<()> {
+ serde_json::to_writer(&mut self.writer, &event)?;
+ self.writer.write_all(b"\n")?;
+ self.writer.flush()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn null_sink_accepts_events() {
+ let mut sink = NullSink;
+ sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
+ }
+
+ #[test]
+ fn jsonl_sink_writes_one_line_per_event() {
+ let mut sink = JsonlSink::new(Vec::<u8>::new());
+ sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
+ sink.emit(Event::ShStarted {
+ job_id: "a".into(),
+ cmd: "echo".into(),
+ })
+ .unwrap();
+ let output = sink.into_inner();
+ let s = std::str::from_utf8(&output).unwrap();
+ assert_eq!(
+ s,
+ "{\"type\":\"job_started\",\"job_id\":\"a\"}\n\
+ {\"type\":\"sh_started\",\"job_id\":\"a\",\"cmd\":\"echo\"}\n"
+ );
+ }
+
+ #[test]
+ fn jsonl_sink_flushes_per_event() {
+ struct CountingWriter {
+ buf: Vec<u8>,
+ flushes: usize,
+ }
+ impl Write for CountingWriter {
+ fn write(&mut self, b: &[u8]) -> io::Result<usize> {
+ self.buf.extend_from_slice(b);
+ Ok(b.len())
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.flushes += 1;
+ Ok(())
+ }
+ }
+
+ let mut sink = JsonlSink::new(CountingWriter {
+ buf: Vec::new(),
+ flushes: 0,
+ });
+ sink.emit(Event::JobStarted { job_id: "a".into() }).unwrap();
+ sink.emit(Event::JobStarted { job_id: "b".into() }).unwrap();
+ let inner = sink.into_inner();
+ assert_eq!(inner.flushes, 2);
+ }
+}