wsprism_gateway/realtime/core/
presence.rs

1use dashmap::{DashMap, DashSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use wsprism_core::error::{Result, WsPrismError};
4use crate::config::schema::TenantLimits;
5
6/// Room presence: `room_key -> sessions`, `session_key -> rooms`.
7///
8/// Sprint 5: Added user-level indexing and tenant counters for governance.
9/// Lock-free best-effort design: under heavy contention, limits can be
10/// temporarily exceeded by a small margin to preserve throughput.
11#[derive(Default)]
12pub struct Presence {
13    // Routing indices
14    room_to_sessions: DashMap<String, DashSet<String>>,
15    session_to_rooms: DashMap<String, DashSet<String>>,
16
17    // Governance indices
18    room_to_users: DashMap<String, DashSet<String>>,
19    user_to_rooms: DashMap<String, DashSet<String>>,
20    
21    // Multi-session ref-counting: user::room -> session_count
22    user_room_refs: DashMap<String, usize>,
23
24    // O(1) Counters
25    tenant_room_counts: DashMap<String, AtomicU64>,
26}
27
28impl Presence {
29    pub fn new() -> Self {
30        Self {
31            room_to_sessions: DashMap::new(),
32            session_to_rooms: DashMap::new(),
33            room_to_users: DashMap::new(),
34            user_to_rooms: DashMap::new(),
35            user_room_refs: DashMap::new(),
36            tenant_room_counts: DashMap::new(),
37        }
38    }
39
40    /// Attempt to join a room with per-tenant/user/room limits enforced.
41    ///
42    /// Concurrency note: This uses lock-free counters and map lookups for
43    /// performance. The check (e.g., `current < limit`) and the subsequent
44    /// insert are not atomic across threads, so at high contention the limit
45    /// may be exceeded by a small margin (acceptable trade-off for throughput).
46    /// Do not rely on this for exact “hard” ceilings; it is designed for fast
47    /// best-effort enforcement in a single-node gateway.
48    pub fn try_join(
49        &self,
50        tenant_id: &str,
51        room_key: &str,
52        user_key: &str,
53        session_key: &str,
54        limits: &TenantLimits
55    ) -> Result<()> {
56        
57        // --- 1. Check Room Capacity (Max Users per Room) ---
58        if limits.max_users_per_room > 0 {
59            if let Some(users) = self.room_to_users.get(room_key) {
60                // If user is not already in room, check limit
61                if users.len() as u64 >= limits.max_users_per_room && !users.contains(user_key) {
62                     return Err(WsPrismError::ResourceExhausted("room user limit reached".into()));
63                }
64            }
65        }
66
67        // --- 2. Check User's Room Limit (Max Rooms per User) ---
68        if limits.max_rooms_per_user > 0 {
69            if let Some(rooms) = self.user_to_rooms.get(user_key) {
70                if rooms.len() as u64 >= limits.max_rooms_per_user && !rooms.contains(room_key) {
71                    return Err(WsPrismError::ResourceExhausted("user room limit reached".into()));
72                }
73            }
74        }
75
76        // --- 3. Check Tenant Total Rooms ---
77        // We only increment if the room is NEW (currently has no sessions).
78        // Note: This is a slight approximation. A room exists if it has sessions.
79        let is_new_room = !self.room_to_sessions.contains_key(room_key);
80        if is_new_room && limits.max_rooms_total > 0 {
81            let counter = self.tenant_room_counts.entry(tenant_id.to_string()).or_insert_with(|| AtomicU64::new(0));
82            if counter.load(Ordering::Relaxed) >= limits.max_rooms_total {
83                return Err(WsPrismError::ResourceExhausted("tenant room limit reached".into()));
84            }
85            // Increment
86            counter.fetch_add(1, Ordering::Relaxed);
87        } else if is_new_room {
88             self.tenant_room_counts.entry(tenant_id.to_string()).or_default().fetch_add(1, Ordering::Relaxed);
89        }
90
91        // --- 4. Perform Join (Order: Routing -> Governance) ---
92        
93        // A. Routing
94        self.room_to_sessions.entry(room_key.to_string()).or_default().insert(session_key.to_string());
95        self.session_to_rooms.entry(session_key.to_string()).or_default().insert(room_key.to_string());
96        
97        // B. Governance (Ref counting for multi-session support)
98        let ref_key = format!("{}::{}", user_key, room_key);
99        let mut refs = self.user_room_refs.entry(ref_key).or_insert(0);
100        *refs += 1;
101        
102        // If this is the first session for this user in this room, add to user indices
103        if *refs == 1 {
104            self.room_to_users.entry(room_key.to_string()).or_default().insert(user_key.to_string());
105            self.user_to_rooms.entry(user_key.to_string()).or_default().insert(room_key.to_string());
106        }
107
108        Ok(())
109    }
110
111    pub fn leave(&self, tenant_id: &str, room_key: &str, user_key: &str, session_key: &str) {
112        // 1. Remove from routing
113        let mut room_empty = false;
114        if let Some(set) = self.room_to_sessions.get(room_key) {
115            set.remove(session_key);
116            room_empty = set.is_empty();
117        }
118        // Cleanup empty routing set outside lock
119        if room_empty { self.room_to_sessions.remove(room_key); }
120
121        if let Some(set) = self.session_to_rooms.get(session_key) {
122            set.remove(room_key);
123            if set.is_empty() { drop(set); self.session_to_rooms.remove(session_key); }
124        }
125
126        // 2. Remove from governance (Ref counting)
127        let ref_key = format!("{}::{}", user_key, room_key);
128        let mut remove_user_mapping = false;
129        
130        if let Some(mut refs) = self.user_room_refs.get_mut(&ref_key) {
131            *refs -= 1;
132            if *refs == 0 {
133                remove_user_mapping = true;
134            }
135        }
136        if remove_user_mapping {
137            self.user_room_refs.remove(&ref_key);
138            
139            // Remove from user_to_rooms
140            if let Some(set) = self.user_to_rooms.get(user_key) {
141                set.remove(room_key);
142                if set.is_empty() { drop(set); self.user_to_rooms.remove(user_key); }
143            }
144
145            // Remove from room_to_users
146            if let Some(set) = self.room_to_users.get(room_key) {
147                set.remove(user_key);
148                if set.is_empty() { drop(set); self.room_to_users.remove(room_key); }
149            }
150        }
151        
152        // 3. Decrement Tenant Room Count if room empty
153        if room_empty {
154             if let Some(counter) = self.tenant_room_counts.get(tenant_id) {
155                 counter.fetch_sub(1, Ordering::Relaxed);
156             }
157        }
158    }
159
160    pub fn sessions_in(&self, room_key: &str) -> Vec<String> {
161        self.room_to_sessions.get(room_key)
162            .map(|set| set.iter().map(|u| u.key().to_string()).collect())
163            .unwrap_or_default()
164    }
165
166    // Called by RAII Drop
167    pub fn cleanup_session(&self, tenant_id: &str, user_key: &str, session_key: &str) {
168        if let Some(rooms) = self.session_to_rooms.remove(session_key).map(|(_, v)| v) {
169            for r in rooms.iter() {
170                let room_key = r.key();
171                // Use the full leave logic to ensure ref-counts and limits are updated correctly
172                self.leave(tenant_id, room_key, user_key, session_key);
173            }
174        }
175    }
176}