1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//! Event sinks consume the structured stream emitted during a run.
//!
//! [`NullSink`] drops events on the floor and is the default for
//! standalone `quire-ci run`. [`JsonlSink`] writes one JSON object per
//! line to any [`Write`], flushing per event so a crash mid-run leaves
//! a usable artifact.

use std::io::{self, Write};

use quire_core::ci::event::Event;

/// A consumer of pipeline events.
pub trait EventSink {
    fn emit(&mut self, event: Event) -> io::Result<()>;
}

/// Drops every event. Default for `quire-ci run` standalone use.
pub struct NullSink;

impl EventSink for NullSink {
    fn emit(&mut self, _event: Event) -> io::Result<()> {
        Ok(())
    }
}

/// Writes events as JSON Lines (one object per line) to a writer,
/// flushing after each event. Generic over the writer so production
/// code uses [`io::Stdout`] and tests use a `Vec<u8>`.
pub struct JsonlSink<W: Write> {
    writer: W,
}

impl<W: Write> JsonlSink<W> {
    pub fn new(writer: W) -> Self {
        Self { writer }
    }

    /// Recover the underlying writer (used by tests to inspect output).
    #[cfg(test)]
    pub fn into_inner(self) -> W {
        self.writer
    }
}

impl<W: Write> EventSink for JsonlSink<W> {
    fn emit(&mut self, event: Event) -> io::Result<()> {
        serde_json::to_writer(&mut self.writer, &event)?;
        self.writer.write_all(b"\n")?;
        self.writer.flush()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use quire_core::ci::event::EventKind;

    fn sample_started(job_id: &str) -> Event {
        Event {
            at_ms: 1,
            kind: EventKind::JobStarted {
                job_id: job_id.into(),
            },
        }
    }

    #[test]
    fn null_sink_accepts_events() {
        let mut sink = NullSink;
        sink.emit(sample_started("a")).unwrap();
    }

    #[test]
    fn jsonl_sink_writes_one_line_per_event() {
        let mut sink = JsonlSink::new(Vec::<u8>::new());
        sink.emit(Event {
            at_ms: 10,
            kind: EventKind::ShStarted {
                job_id: "a".into(),
                cmd: "echo".into(),
            },
        })
        .unwrap();
        sink.emit(sample_started("a")).unwrap();
        let output = sink.into_inner();
        let s = std::str::from_utf8(&output).unwrap();
        let lines: Vec<&str> = s.lines().collect();
        assert_eq!(lines.len(), 2);
        assert!(lines[0].contains(r#""type":"sh_started""#));
        assert!(lines[1].contains(r#""type":"job_started""#));
    }

    #[test]
    fn jsonl_sink_flushes_per_event() {
        struct CountingWriter {
            buf: Vec<u8>,
            flushes: usize,
        }
        impl Write for CountingWriter {
            fn write(&mut self, b: &[u8]) -> io::Result<usize> {
                self.buf.extend_from_slice(b);
                Ok(b.len())
            }
            fn flush(&mut self) -> io::Result<()> {
                self.flushes += 1;
                Ok(())
            }
        }

        let mut sink = JsonlSink::new(CountingWriter {
            buf: Vec::new(),
            flushes: 0,
        });
        sink.emit(sample_started("a")).unwrap();
        sink.emit(sample_started("b")).unwrap();
        let inner = sink.into_inner();
        assert_eq!(inner.flushes, 2);
    }
}