wsprism_gateway/realtime/core/
presence.rs1use dashmap::{DashMap, DashSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use wsprism_core::error::{Result, WsPrismError};
4use crate::config::schema::TenantLimits;
5
6#[derive(Default)]
12pub struct Presence {
13 room_to_sessions: DashMap<String, DashSet<String>>,
15 session_to_rooms: DashMap<String, DashSet<String>>,
16
17 room_to_users: DashMap<String, DashSet<String>>,
19 user_to_rooms: DashMap<String, DashSet<String>>,
20
21 user_room_refs: DashMap<String, usize>,
23
24 tenant_room_counts: DashMap<String, AtomicU64>,
26}
27
28impl Presence {
29 pub fn new() -> Self {
30 Self {
31 room_to_sessions: DashMap::new(),
32 session_to_rooms: DashMap::new(),
33 room_to_users: DashMap::new(),
34 user_to_rooms: DashMap::new(),
35 user_room_refs: DashMap::new(),
36 tenant_room_counts: DashMap::new(),
37 }
38 }
39
40 pub fn try_join(
49 &self,
50 tenant_id: &str,
51 room_key: &str,
52 user_key: &str,
53 session_key: &str,
54 limits: &TenantLimits
55 ) -> Result<()> {
56
57 if limits.max_users_per_room > 0 {
59 if let Some(users) = self.room_to_users.get(room_key) {
60 if users.len() as u64 >= limits.max_users_per_room && !users.contains(user_key) {
62 return Err(WsPrismError::ResourceExhausted("room user limit reached".into()));
63 }
64 }
65 }
66
67 if limits.max_rooms_per_user > 0 {
69 if let Some(rooms) = self.user_to_rooms.get(user_key) {
70 if rooms.len() as u64 >= limits.max_rooms_per_user && !rooms.contains(room_key) {
71 return Err(WsPrismError::ResourceExhausted("user room limit reached".into()));
72 }
73 }
74 }
75
76 let is_new_room = !self.room_to_sessions.contains_key(room_key);
80 if is_new_room && limits.max_rooms_total > 0 {
81 let counter = self.tenant_room_counts.entry(tenant_id.to_string()).or_insert_with(|| AtomicU64::new(0));
82 if counter.load(Ordering::Relaxed) >= limits.max_rooms_total {
83 return Err(WsPrismError::ResourceExhausted("tenant room limit reached".into()));
84 }
85 counter.fetch_add(1, Ordering::Relaxed);
87 } else if is_new_room {
88 self.tenant_room_counts.entry(tenant_id.to_string()).or_default().fetch_add(1, Ordering::Relaxed);
89 }
90
91 self.room_to_sessions.entry(room_key.to_string()).or_default().insert(session_key.to_string());
95 self.session_to_rooms.entry(session_key.to_string()).or_default().insert(room_key.to_string());
96
97 let ref_key = format!("{}::{}", user_key, room_key);
99 let mut refs = self.user_room_refs.entry(ref_key).or_insert(0);
100 *refs += 1;
101
102 if *refs == 1 {
104 self.room_to_users.entry(room_key.to_string()).or_default().insert(user_key.to_string());
105 self.user_to_rooms.entry(user_key.to_string()).or_default().insert(room_key.to_string());
106 }
107
108 Ok(())
109 }
110
111 pub fn leave(&self, tenant_id: &str, room_key: &str, user_key: &str, session_key: &str) {
112 let mut room_empty = false;
114 if let Some(set) = self.room_to_sessions.get(room_key) {
115 set.remove(session_key);
116 room_empty = set.is_empty();
117 }
118 if room_empty { self.room_to_sessions.remove(room_key); }
120
121 if let Some(set) = self.session_to_rooms.get(session_key) {
122 set.remove(room_key);
123 if set.is_empty() { drop(set); self.session_to_rooms.remove(session_key); }
124 }
125
126 let ref_key = format!("{}::{}", user_key, room_key);
128 let mut remove_user_mapping = false;
129
130 if let Some(mut refs) = self.user_room_refs.get_mut(&ref_key) {
131 *refs -= 1;
132 if *refs == 0 {
133 remove_user_mapping = true;
134 }
135 }
136 if remove_user_mapping {
137 self.user_room_refs.remove(&ref_key);
138
139 if let Some(set) = self.user_to_rooms.get(user_key) {
141 set.remove(room_key);
142 if set.is_empty() { drop(set); self.user_to_rooms.remove(user_key); }
143 }
144
145 if let Some(set) = self.room_to_users.get(room_key) {
147 set.remove(user_key);
148 if set.is_empty() { drop(set); self.room_to_users.remove(room_key); }
149 }
150 }
151
152 if room_empty {
154 if let Some(counter) = self.tenant_room_counts.get(tenant_id) {
155 counter.fetch_sub(1, Ordering::Relaxed);
156 }
157 }
158 }
159
160 pub fn sessions_in(&self, room_key: &str) -> Vec<String> {
161 self.room_to_sessions.get(room_key)
162 .map(|set| set.iter().map(|u| u.key().to_string()).collect())
163 .unwrap_or_default()
164 }
165
166 pub fn cleanup_session(&self, tenant_id: &str, user_key: &str, session_key: &str) {
168 if let Some(rooms) = self.session_to_rooms.remove(session_key).map(|(_, v)| v) {
169 for r in rooms.iter() {
170 let room_key = r.key();
171 self.leave(tenant_id, room_key, user_key, session_key);
173 }
174 }
175 }
176}