wsprism_gateway/realtime/core/
realtime.rs

1//! Realtime egress engine and per-message context.
2//! 
3//! The realtime core fans out messages to users, sessions, or rooms with
4//! lossy/reliable QoS and keeps lightweight counters for observability. It is
5//! intentionally lock-free; small temporary overshoots are possible under high
6//! contention to preserve throughput.
7
8use std::borrow::Cow;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use axum::extract::ws::{CloseFrame, Message};
12use futures_util::stream::FuturesUnordered;
13use futures_util::StreamExt;
14use tokio::time::{timeout, Duration};
15use wsprism_core::error::{Result, WsPrismError};
16use crate::realtime::core::{Presence, SessionRegistry};
17use crate::realtime::types::{Outgoing, PreparedMsg, QoS};
18use crate::config::schema::TenantLimits;
19
20static DROP_COUNT: AtomicU64 = AtomicU64::new(0);
21static SEND_FAIL_COUNT: AtomicU64 = AtomicU64::new(0);
22
23pub fn egress_drop_count() -> u64 { DROP_COUNT.load(Ordering::Relaxed) }
24pub fn egress_send_fail_count() -> u64 { SEND_FAIL_COUNT.load(Ordering::Relaxed) }
25fn sample_every_1024(n: u64) -> bool { (n & 1023) == 1 }
26
27pub struct RealtimeCore {
28    pub sessions: Arc<SessionRegistry>,
29    pub presence: Arc<Presence>,
30}
31
32impl RealtimeCore {
33    /// Create a new realtime core with fresh registries.
34    pub fn new() -> Self {
35        Self {
36            sessions: Arc::new(SessionRegistry::new()),
37            presence: Arc::new(Presence::new()),
38        }
39    }
40
41    /// Send Close frames to all sessions during draining (best-effort).
42    pub fn best_effort_shutdown_all(&self, reason: &str) {
43        let sessions = self.sessions.all_sessions();
44        for (session_key, conn) in sessions {
45            let frame = CloseFrame {
46                code: 1001,
47                reason: Cow::from(reason.to_string()),
48            };
49            if conn.tx.try_send(Message::Close(Some(frame))).is_err() {
50                let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
51                if sample_every_1024(n) {
52                    tracing::warn!(%session_key, drops=%n, "egress drop while draining");
53                }
54            }
55        }
56    }
57
58    pub fn send_to_user(&self, user_key: &str, out: Outgoing) -> Result<()> {
59        let conns = self.sessions.get_user_sessions(user_key);
60        if conns.is_empty() {
61            return Err(WsPrismError::BadRequest("user not connected".into()));
62        }
63        let prepared = PreparedMsg::prepare(&out)?;
64        for c in conns {
65            if c.tx.try_send(prepared.to_ws_message()).is_err() {
66                let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
67                if sample_every_1024(n) { tracing::warn!(user_key=%user_key, drops=%n, "egress drop"); }
68            }
69        }
70        Ok(())
71    }
72
73    /// Send to a single session. Queue-full drops are sampled and logged.
74    pub fn send_to_session(&self, session_key: &str, out: Outgoing) -> Result<()> {
75        let conn = self.sessions.get_session(session_key)
76            .ok_or_else(|| WsPrismError::BadRequest("session not connected".into()))?;
77        let prepared = PreparedMsg::prepare(&out)?;
78        if conn.tx.try_send(prepared.to_ws_message()).is_err() {
79            let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
80            if sample_every_1024(n) { tracing::warn!(%session_key, "send_to_session dropped"); }
81        }
82        Ok(())
83    }
84
85    pub fn publish_room_lossy(&self, room_key: &str, out: Outgoing) -> Result<()> {
86        let prepared = PreparedMsg::prepare(&out)?;
87        let sessions = self.presence.sessions_in(room_key);
88        for sid in sessions {
89            if let Some(conn) = self.sessions.get_session(&sid) {
90                if conn.tx.try_send(prepared.to_ws_message()).is_err() {
91                    let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
92                    if sample_every_1024(n) { tracing::warn!(room_key=%room_key, drops=%n, "lossy drop"); }
93                }
94            }
95        }
96        Ok(())
97    }
98
99    pub async fn publish_room_reliable(&self, room_key: &str, out: Outgoing) -> Result<()> {
100        let prepared = PreparedMsg::prepare(&out)?;
101        let sessions = self.presence.sessions_in(room_key);
102        let (timeout_ms, do_timeout) = match out.qos {
103            QoS::Reliable { timeout_ms } => (timeout_ms, timeout_ms > 0),
104            _ => (0, false),
105        };
106        let mut futs = FuturesUnordered::new();
107        for sid in sessions {
108            if let Some(conn) = self.sessions.get_session(&sid) {
109                let msg = prepared.to_ws_message();
110                futs.push(async move {
111                    if do_timeout {
112                        if timeout(Duration::from_millis(timeout_ms), conn.tx.send(msg)).await.is_err() {
113                            let n = SEND_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
114                            if sample_every_1024(n) { tracing::warn!(fails=%n, "reliable send timeout"); }
115                        }
116                    } else if conn.tx.send(msg).await.is_err() {
117                        let n = SEND_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
118                        if sample_every_1024(n) { tracing::warn!(fails=%n, "reliable send failed"); }
119                    }
120                });
121            }
122        }
123        while futs.next().await.is_some() {}
124        Ok(())
125    }
126}
127
128#[derive(Clone)]
129pub struct RealtimeCtx {
130    tenant: Arc<str>,
131    user: Arc<str>,
132    user_key: Arc<str>,
133    session_id: Arc<str>,
134    session_key: Arc<str>,
135    pub trace_id: Arc<str>,
136    active_room: Option<Arc<str>>,
137    core: Arc<RealtimeCore>,
138}
139
140impl RealtimeCtx {
141    /// Construct a per-message context with immutable identity and trace fields.
142    pub fn new(
143        tenant: impl Into<Arc<str>>,
144        user: impl Into<Arc<str>>,
145        session_id: impl Into<Arc<str>>,
146        trace_id: impl Into<Arc<str>>,
147        active_room: Option<String>,
148        core: Arc<RealtimeCore>,
149    ) -> Self {
150        let tenant = tenant.into();
151        let user = user.into();
152        let session_id = session_id.into();
153        let user_key: Arc<str> = Arc::from(format!("{}::{}", tenant, user));
154        let session_key: Arc<str> = Arc::from(format!("{}::{}::{}", tenant, user, session_id));
155
156        Self {
157            tenant,
158            user,
159            user_key,
160            session_id,
161            session_key,
162            trace_id: trace_id.into(),
163            active_room: active_room.map(Arc::from),
164            core,
165        }
166    }
167
168    pub fn tenant(&self) -> &str { &self.tenant }
169    pub fn user(&self) -> &str { &self.user }
170    pub fn user_key(&self) -> &str { &self.user_key }
171    pub fn session_id(&self) -> &str { &self.session_id }
172    pub fn session_key(&self) -> &str { &self.session_key }
173    pub fn active_room(&self) -> Option<&str> { self.active_room.as_deref() }
174
175    fn room_key(&self, room: &str) -> String { format!("{}::{}", self.tenant(), room) }
176
177    pub fn join_room_with_limits(&self, room: &str, limits: &TenantLimits) -> Result<()> {
178        let rk = self.room_key(room);
179        self.core.presence.try_join(self.tenant(), &rk, self.user_key(), self.session_key(), limits)
180    }
181
182    pub fn leave_room(&self, room: &str) {
183        let rk = self.room_key(room);
184        self.core.presence.leave(self.tenant(), &rk, self.user_key(), self.session_key());
185    }
186
187    pub fn send_to_user(&self, out: Outgoing) -> Result<()> { self.core.send_to_user(self.user_key(), out) }
188    pub fn send_to_session(&self, out: Outgoing) -> Result<()> { self.core.send_to_session(self.session_key(), out) }
189    pub fn publish_room_lossy(&self, room: &str, out: Outgoing) -> Result<()> {
190        let rk = self.room_key(room);
191        self.core.publish_room_lossy(&rk, out)
192    }
193    pub async fn publish_room_reliable(&self, room: &str, out: Outgoing) -> Result<()> {
194        let rk = self.room_key(room);
195        self.core.publish_room_reliable(&rk, out).await
196    }
197}