wsprism_gateway/obs/
metrics.rs

1//! Minimal metrics registry for the gateway.
2//!
3//! No external dependencies are used; this module provides counter/gauge/histogram
4//! types with dynamic labels backed by `DashMap`. Labels are flattened into
5//! sorted key vectors to keep deterministic ordering. Histogram buckets are
6//! fixed in microseconds to avoid floating point math.
7
8use dashmap::DashMap;
9use std::fmt::Write;
10use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
11use std::time::Duration;
12
13/// Helper to escape label values.
14fn escape_label(v: &str) -> String {
15    v.replace('\\', "\\\\").replace('"', "\\\"").replace('\n', "\\n")
16}
17
18#[derive(Default)]
19pub struct CounterVec {
20    map: DashMap<Vec<(String, String)>, AtomicU64>,
21}
22
23impl CounterVec {
24    /// Increment by 1.
25    pub fn inc(&self, labels: &[(&str, &str)]) {
26        self.add(labels, 1);
27    }
28
29    /// Increment by an arbitrary value.
30    pub fn add(&self, labels: &[(&str, &str)], v: u64) {
31        let mut key: Vec<(String, String)> = labels.iter()
32            .map(|(k, v)| (k.to_string(), v.to_string()))
33            .collect();
34        key.sort();
35        
36        let counter = self.map.entry(key).or_insert_with(|| AtomicU64::new(0));
37        counter.fetch_add(v, Ordering::Relaxed);
38    }
39
40    /// Render in Prometheus text exposition format.
41    fn render(&self, name: &str, out: &mut String) {
42        let _ = writeln!(out, "# TYPE {} counter", name);
43        for r in self.map.iter() {
44            let key = r.key();
45            let val = r.value().load(Ordering::Relaxed);
46            let label_str = key.iter()
47                .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
48                .collect::<Vec<_>>().join(",");
49            let _ = writeln!(out, "{}{{{}}} {}", name, label_str, val);
50        }
51    }
52}
53
54#[derive(Default)]
55pub struct GaugeVec {
56    map: DashMap<Vec<(String, String)>, AtomicI64>,
57}
58
59impl GaugeVec {
60    /// Increment by 1.
61    pub fn inc(&self, labels: &[(&str, &str)]) { self.add(labels, 1); }
62    /// Decrement by 1.
63    pub fn dec(&self, labels: &[(&str, &str)]) { self.add(labels, -1); }
64
65    /// Add an arbitrary signed delta.
66    pub fn add(&self, labels: &[(&str, &str)], v: i64) {
67        let mut key: Vec<(String, String)> = labels.iter()
68            .map(|(k, v)| (k.to_string(), v.to_string()))
69            .collect();
70        key.sort();
71        let gauge = self.map.entry(key).or_insert_with(|| AtomicI64::new(0));
72        gauge.fetch_add(v, Ordering::Relaxed);
73    }
74
75    /// Render in Prometheus text exposition format.
76    fn render(&self, name: &str, out: &mut String) {
77        let _ = writeln!(out, "# TYPE {} gauge", name);
78        for r in self.map.iter() {
79            let key = r.key();
80            let val = r.value().load(Ordering::Relaxed);
81            let label_str = key.iter()
82                .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
83                .collect::<Vec<_>>().join(",");
84            let _ = writeln!(out, "{}{{{}}} {}", name, label_str, val);
85        }
86    }
87}
88
89// Fixed Buckets in Microseconds (µs)
90// 100us, 500us, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s
91const BUCKETS_MICROS: [u64; 9] = [100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000];
92
93struct AtomicHistogram {
94    count: AtomicU64,
95    sum: AtomicU64,
96    buckets: [AtomicU64; 9],
97}
98
99impl Default for AtomicHistogram {
100    fn default() -> Self {
101        Self {
102            count: AtomicU64::new(0),
103            sum: AtomicU64::new(0),
104            buckets: [
105                AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0),
106                AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0),
107                AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)
108            ],
109        }
110    }
111}
112
113#[derive(Default)]
114pub struct HistogramVec {
115    map: DashMap<Vec<(String, String)>, AtomicHistogram>,
116}
117
118impl HistogramVec {
119    /// Observe a duration and increment cumulative buckets (microsecond scale).
120    pub fn observe(&self, labels: &[(&str, &str)], duration: Duration) {
121        let mut key: Vec<(String, String)> = labels.iter()
122            .map(|(k, v)| (k.to_string(), v.to_string()))
123            .collect();
124        key.sort();
125
126        let hist = self.map.entry(key).or_insert_with(AtomicHistogram::default);
127        let micros = duration.as_micros() as u64;
128
129        hist.count.fetch_add(1, Ordering::Relaxed);
130        hist.sum.fetch_add(micros, Ordering::Relaxed); // Record sum in micros
131
132        // Cumulative Buckets: Increment ALL buckets larger than value
133        for (i, &b) in BUCKETS_MICROS.iter().enumerate() {
134            if micros <= b {
135                hist.buckets[i].fetch_add(1, Ordering::Relaxed);
136            }
137        }
138    }
139
140    /// Render in Prometheus text exposition format (unit: microseconds).
141    fn render(&self, name: &str, out: &mut String) {
142        let _ = writeln!(out, "# TYPE {} histogram", name);
143        for r in self.map.iter() {
144            let key = r.key();
145            let hist = r.value();
146            
147            let label_str = key.iter()
148                .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
149                .collect::<Vec<_>>().join(",");
150            let prefix = if label_str.is_empty() { String::new() } else { format!("{},", label_str) };
151
152            for (i, &le) in BUCKETS_MICROS.iter().enumerate() {
153                let count = hist.buckets[i].load(Ordering::Relaxed);
154                // Convert bucket le from micros to seconds string for standard Prometheus display? 
155                // Or just keep int? Standard is seconds (float).
156                // For simplicity in this int-based impl, we output micros but should ideally label unit.
157                // Let's output integer micros for 'le'.
158                let _ = writeln!(out, "{}_bucket{{{}le=\"{}\"}} {}", name, prefix, le, count);
159            }
160            let count = hist.count.load(Ordering::Relaxed);
161            let _ = writeln!(out, "{}_bucket{{{}le=\"+Inf\"}} {}", name, prefix, count);
162            
163            let sum = hist.sum.load(Ordering::Relaxed);
164            let _ = writeln!(out, "{}_sum{{{}}} {}", name, label_str, sum);
165            let _ = writeln!(out, "{}_count{{{}}} {}", name, label_str, count);
166        }
167    }
168}
169
170#[derive(Default)]
171pub struct GatewayMetrics {
172    pub ws_upgrades: CounterVec,
173    pub ws_active_sessions: GaugeVec,
174    pub policy_decisions: CounterVec,
175    pub handshake_rejections: CounterVec,
176    pub dispatch_duration: HistogramVec, // In Microseconds
177    pub decode_errors: CounterVec,
178    pub service_errors: CounterVec,
179    pub writer_timeouts: CounterVec,
180    pub unknown_service_errors: CounterVec,
181    draining: std::sync::atomic::AtomicBool,
182}
183
184impl GatewayMetrics {
185    /// Mark draining state.
186    pub fn set_draining(&self) { self.draining.store(true, Ordering::Relaxed); }
187    /// Return whether draining is active.
188    pub fn is_draining(&self) -> bool { self.draining.load(Ordering::Relaxed) }
189
190    /// Render all registered metrics plus any extra lines provided by callers.
191    pub fn render(&self, extra: &[(&str, u64)]) -> String {
192        let mut out = String::new();
193        self.ws_upgrades.render("wsprism_ws_upgrades_total", &mut out);
194        self.ws_active_sessions.render("wsprism_ws_sessions_active", &mut out);
195        self.policy_decisions.render("wsprism_policy_decisions_total", &mut out);
196        self.handshake_rejections.render("wsprism_handshake_rejections_total", &mut out);
197        self.dispatch_duration.render("wsprism_dispatch_duration_micros", &mut out); // Explicit unit
198        self.decode_errors.render("wsprism_decode_errors_total", &mut out);
199        self.service_errors.render("wsprism_service_errors_total", &mut out);
200        self.writer_timeouts.render("wsprism_writer_timeouts_total", &mut out);
201        self.unknown_service_errors.render("wsprism_unknown_service_total", &mut out);
202        
203        let _ = writeln!(out, "# TYPE wsprism_draining gauge\nwsprism_draining {}", if self.is_draining() { 1 } else { 0 });
204        for (k, v) in extra { let _ = writeln!(out, "{} {}", k, v); }
205        out
206    }
207}