wsprism_gateway/realtime/core/
realtime.rs1use std::borrow::Cow;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use axum::extract::ws::{CloseFrame, Message};
12use futures_util::stream::FuturesUnordered;
13use futures_util::StreamExt;
14use tokio::time::{timeout, Duration};
15use wsprism_core::error::{Result, WsPrismError};
16use crate::realtime::core::{Presence, SessionRegistry};
17use crate::realtime::types::{Outgoing, PreparedMsg, QoS};
18use crate::config::schema::TenantLimits;
19
20static DROP_COUNT: AtomicU64 = AtomicU64::new(0);
21static SEND_FAIL_COUNT: AtomicU64 = AtomicU64::new(0);
22
23pub fn egress_drop_count() -> u64 { DROP_COUNT.load(Ordering::Relaxed) }
24pub fn egress_send_fail_count() -> u64 { SEND_FAIL_COUNT.load(Ordering::Relaxed) }
25fn sample_every_1024(n: u64) -> bool { (n & 1023) == 1 }
26
27pub struct RealtimeCore {
28 pub sessions: Arc<SessionRegistry>,
29 pub presence: Arc<Presence>,
30}
31
32impl RealtimeCore {
33 pub fn new() -> Self {
35 Self {
36 sessions: Arc::new(SessionRegistry::new()),
37 presence: Arc::new(Presence::new()),
38 }
39 }
40
41 pub fn best_effort_shutdown_all(&self, reason: &str) {
43 let sessions = self.sessions.all_sessions();
44 for (session_key, conn) in sessions {
45 let frame = CloseFrame {
46 code: 1001,
47 reason: Cow::from(reason.to_string()),
48 };
49 if conn.tx.try_send(Message::Close(Some(frame))).is_err() {
50 let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
51 if sample_every_1024(n) {
52 tracing::warn!(%session_key, drops=%n, "egress drop while draining");
53 }
54 }
55 }
56 }
57
58 pub fn send_to_user(&self, user_key: &str, out: Outgoing) -> Result<()> {
59 let conns = self.sessions.get_user_sessions(user_key);
60 if conns.is_empty() {
61 return Err(WsPrismError::BadRequest("user not connected".into()));
62 }
63 let prepared = PreparedMsg::prepare(&out)?;
64 for c in conns {
65 if c.tx.try_send(prepared.to_ws_message()).is_err() {
66 let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
67 if sample_every_1024(n) { tracing::warn!(user_key=%user_key, drops=%n, "egress drop"); }
68 }
69 }
70 Ok(())
71 }
72
73 pub fn send_to_session(&self, session_key: &str, out: Outgoing) -> Result<()> {
75 let conn = self.sessions.get_session(session_key)
76 .ok_or_else(|| WsPrismError::BadRequest("session not connected".into()))?;
77 let prepared = PreparedMsg::prepare(&out)?;
78 if conn.tx.try_send(prepared.to_ws_message()).is_err() {
79 let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
80 if sample_every_1024(n) { tracing::warn!(%session_key, "send_to_session dropped"); }
81 }
82 Ok(())
83 }
84
85 pub fn publish_room_lossy(&self, room_key: &str, out: Outgoing) -> Result<()> {
86 let prepared = PreparedMsg::prepare(&out)?;
87 let sessions = self.presence.sessions_in(room_key);
88 for sid in sessions {
89 if let Some(conn) = self.sessions.get_session(&sid) {
90 if conn.tx.try_send(prepared.to_ws_message()).is_err() {
91 let n = DROP_COUNT.fetch_add(1, Ordering::Relaxed);
92 if sample_every_1024(n) { tracing::warn!(room_key=%room_key, drops=%n, "lossy drop"); }
93 }
94 }
95 }
96 Ok(())
97 }
98
99 pub async fn publish_room_reliable(&self, room_key: &str, out: Outgoing) -> Result<()> {
100 let prepared = PreparedMsg::prepare(&out)?;
101 let sessions = self.presence.sessions_in(room_key);
102 let (timeout_ms, do_timeout) = match out.qos {
103 QoS::Reliable { timeout_ms } => (timeout_ms, timeout_ms > 0),
104 _ => (0, false),
105 };
106 let mut futs = FuturesUnordered::new();
107 for sid in sessions {
108 if let Some(conn) = self.sessions.get_session(&sid) {
109 let msg = prepared.to_ws_message();
110 futs.push(async move {
111 if do_timeout {
112 if timeout(Duration::from_millis(timeout_ms), conn.tx.send(msg)).await.is_err() {
113 let n = SEND_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
114 if sample_every_1024(n) { tracing::warn!(fails=%n, "reliable send timeout"); }
115 }
116 } else if conn.tx.send(msg).await.is_err() {
117 let n = SEND_FAIL_COUNT.fetch_add(1, Ordering::Relaxed);
118 if sample_every_1024(n) { tracing::warn!(fails=%n, "reliable send failed"); }
119 }
120 });
121 }
122 }
123 while futs.next().await.is_some() {}
124 Ok(())
125 }
126}
127
128#[derive(Clone)]
129pub struct RealtimeCtx {
130 tenant: Arc<str>,
131 user: Arc<str>,
132 user_key: Arc<str>,
133 session_id: Arc<str>,
134 session_key: Arc<str>,
135 pub trace_id: Arc<str>,
136 active_room: Option<Arc<str>>,
137 core: Arc<RealtimeCore>,
138}
139
140impl RealtimeCtx {
141 pub fn new(
143 tenant: impl Into<Arc<str>>,
144 user: impl Into<Arc<str>>,
145 session_id: impl Into<Arc<str>>,
146 trace_id: impl Into<Arc<str>>,
147 active_room: Option<String>,
148 core: Arc<RealtimeCore>,
149 ) -> Self {
150 let tenant = tenant.into();
151 let user = user.into();
152 let session_id = session_id.into();
153 let user_key: Arc<str> = Arc::from(format!("{}::{}", tenant, user));
154 let session_key: Arc<str> = Arc::from(format!("{}::{}::{}", tenant, user, session_id));
155
156 Self {
157 tenant,
158 user,
159 user_key,
160 session_id,
161 session_key,
162 trace_id: trace_id.into(),
163 active_room: active_room.map(Arc::from),
164 core,
165 }
166 }
167
168 pub fn tenant(&self) -> &str { &self.tenant }
169 pub fn user(&self) -> &str { &self.user }
170 pub fn user_key(&self) -> &str { &self.user_key }
171 pub fn session_id(&self) -> &str { &self.session_id }
172 pub fn session_key(&self) -> &str { &self.session_key }
173 pub fn active_room(&self) -> Option<&str> { self.active_room.as_deref() }
174
175 fn room_key(&self, room: &str) -> String { format!("{}::{}", self.tenant(), room) }
176
177 pub fn join_room_with_limits(&self, room: &str, limits: &TenantLimits) -> Result<()> {
178 let rk = self.room_key(room);
179 self.core.presence.try_join(self.tenant(), &rk, self.user_key(), self.session_key(), limits)
180 }
181
182 pub fn leave_room(&self, room: &str) {
183 let rk = self.room_key(room);
184 self.core.presence.leave(self.tenant(), &rk, self.user_key(), self.session_key());
185 }
186
187 pub fn send_to_user(&self, out: Outgoing) -> Result<()> { self.core.send_to_user(self.user_key(), out) }
188 pub fn send_to_session(&self, out: Outgoing) -> Result<()> { self.core.send_to_session(self.session_key(), out) }
189 pub fn publish_room_lossy(&self, room: &str, out: Outgoing) -> Result<()> {
190 let rk = self.room_key(room);
191 self.core.publish_room_lossy(&rk, out)
192 }
193 pub async fn publish_room_reliable(&self, room: &str, out: Outgoing) -> Result<()> {
194 let rk = self.room_key(room);
195 self.core.publish_room_reliable(&rk, out).await
196 }
197}