1use dashmap::DashMap;
9use std::fmt::Write;
10use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
11use std::time::Duration;
12
13fn escape_label(v: &str) -> String {
15 v.replace('\\', "\\\\").replace('"', "\\\"").replace('\n', "\\n")
16}
17
18#[derive(Default)]
19pub struct CounterVec {
20 map: DashMap<Vec<(String, String)>, AtomicU64>,
21}
22
23impl CounterVec {
24 pub fn inc(&self, labels: &[(&str, &str)]) {
26 self.add(labels, 1);
27 }
28
29 pub fn add(&self, labels: &[(&str, &str)], v: u64) {
31 let mut key: Vec<(String, String)> = labels.iter()
32 .map(|(k, v)| (k.to_string(), v.to_string()))
33 .collect();
34 key.sort();
35
36 let counter = self.map.entry(key).or_insert_with(|| AtomicU64::new(0));
37 counter.fetch_add(v, Ordering::Relaxed);
38 }
39
40 fn render(&self, name: &str, out: &mut String) {
42 let _ = writeln!(out, "# TYPE {} counter", name);
43 for r in self.map.iter() {
44 let key = r.key();
45 let val = r.value().load(Ordering::Relaxed);
46 let label_str = key.iter()
47 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
48 .collect::<Vec<_>>().join(",");
49 let _ = writeln!(out, "{}{{{}}} {}", name, label_str, val);
50 }
51 }
52}
53
54#[derive(Default)]
55pub struct GaugeVec {
56 map: DashMap<Vec<(String, String)>, AtomicI64>,
57}
58
59impl GaugeVec {
60 pub fn inc(&self, labels: &[(&str, &str)]) { self.add(labels, 1); }
62 pub fn dec(&self, labels: &[(&str, &str)]) { self.add(labels, -1); }
64
65 pub fn add(&self, labels: &[(&str, &str)], v: i64) {
67 let mut key: Vec<(String, String)> = labels.iter()
68 .map(|(k, v)| (k.to_string(), v.to_string()))
69 .collect();
70 key.sort();
71 let gauge = self.map.entry(key).or_insert_with(|| AtomicI64::new(0));
72 gauge.fetch_add(v, Ordering::Relaxed);
73 }
74
75 fn render(&self, name: &str, out: &mut String) {
77 let _ = writeln!(out, "# TYPE {} gauge", name);
78 for r in self.map.iter() {
79 let key = r.key();
80 let val = r.value().load(Ordering::Relaxed);
81 let label_str = key.iter()
82 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
83 .collect::<Vec<_>>().join(",");
84 let _ = writeln!(out, "{}{{{}}} {}", name, label_str, val);
85 }
86 }
87}
88
89const BUCKETS_MICROS: [u64; 9] = [100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000];
92
93struct AtomicHistogram {
94 count: AtomicU64,
95 sum: AtomicU64,
96 buckets: [AtomicU64; 9],
97}
98
99impl Default for AtomicHistogram {
100 fn default() -> Self {
101 Self {
102 count: AtomicU64::new(0),
103 sum: AtomicU64::new(0),
104 buckets: [
105 AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0),
106 AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0),
107 AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0)
108 ],
109 }
110 }
111}
112
113#[derive(Default)]
114pub struct HistogramVec {
115 map: DashMap<Vec<(String, String)>, AtomicHistogram>,
116}
117
118impl HistogramVec {
119 pub fn observe(&self, labels: &[(&str, &str)], duration: Duration) {
121 let mut key: Vec<(String, String)> = labels.iter()
122 .map(|(k, v)| (k.to_string(), v.to_string()))
123 .collect();
124 key.sort();
125
126 let hist = self.map.entry(key).or_insert_with(AtomicHistogram::default);
127 let micros = duration.as_micros() as u64;
128
129 hist.count.fetch_add(1, Ordering::Relaxed);
130 hist.sum.fetch_add(micros, Ordering::Relaxed); for (i, &b) in BUCKETS_MICROS.iter().enumerate() {
134 if micros <= b {
135 hist.buckets[i].fetch_add(1, Ordering::Relaxed);
136 }
137 }
138 }
139
140 fn render(&self, name: &str, out: &mut String) {
142 let _ = writeln!(out, "# TYPE {} histogram", name);
143 for r in self.map.iter() {
144 let key = r.key();
145 let hist = r.value();
146
147 let label_str = key.iter()
148 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label(v)))
149 .collect::<Vec<_>>().join(",");
150 let prefix = if label_str.is_empty() { String::new() } else { format!("{},", label_str) };
151
152 for (i, &le) in BUCKETS_MICROS.iter().enumerate() {
153 let count = hist.buckets[i].load(Ordering::Relaxed);
154 let _ = writeln!(out, "{}_bucket{{{}le=\"{}\"}} {}", name, prefix, le, count);
159 }
160 let count = hist.count.load(Ordering::Relaxed);
161 let _ = writeln!(out, "{}_bucket{{{}le=\"+Inf\"}} {}", name, prefix, count);
162
163 let sum = hist.sum.load(Ordering::Relaxed);
164 let _ = writeln!(out, "{}_sum{{{}}} {}", name, label_str, sum);
165 let _ = writeln!(out, "{}_count{{{}}} {}", name, label_str, count);
166 }
167 }
168}
169
170#[derive(Default)]
171pub struct GatewayMetrics {
172 pub ws_upgrades: CounterVec,
173 pub ws_active_sessions: GaugeVec,
174 pub policy_decisions: CounterVec,
175 pub handshake_rejections: CounterVec,
176 pub dispatch_duration: HistogramVec, pub decode_errors: CounterVec,
178 pub service_errors: CounterVec,
179 pub writer_timeouts: CounterVec,
180 pub unknown_service_errors: CounterVec,
181 draining: std::sync::atomic::AtomicBool,
182}
183
184impl GatewayMetrics {
185 pub fn set_draining(&self) { self.draining.store(true, Ordering::Relaxed); }
187 pub fn is_draining(&self) -> bool { self.draining.load(Ordering::Relaxed) }
189
190 pub fn render(&self, extra: &[(&str, u64)]) -> String {
192 let mut out = String::new();
193 self.ws_upgrades.render("wsprism_ws_upgrades_total", &mut out);
194 self.ws_active_sessions.render("wsprism_ws_sessions_active", &mut out);
195 self.policy_decisions.render("wsprism_policy_decisions_total", &mut out);
196 self.handshake_rejections.render("wsprism_handshake_rejections_total", &mut out);
197 self.dispatch_duration.render("wsprism_dispatch_duration_micros", &mut out); self.decode_errors.render("wsprism_decode_errors_total", &mut out);
199 self.service_errors.render("wsprism_service_errors_total", &mut out);
200 self.writer_timeouts.render("wsprism_writer_timeouts_total", &mut out);
201 self.unknown_service_errors.render("wsprism_unknown_service_total", &mut out);
202
203 let _ = writeln!(out, "# TYPE wsprism_draining gauge\nwsprism_draining {}", if self.is_draining() { 1 } else { 0 });
204 for (k, v) in extra { let _ = writeln!(out, "{} {}", k, v); }
205 out
206 }
207}