user_awareness.rs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use std::sync::{Arc, Weak};
  2. use appflowy_integrate::reminder::Reminder;
  3. use appflowy_integrate::{CollabType, RocksCollabDB};
  4. use collab::core::collab::{CollabRawData, MutexCollab};
  5. use collab_user::core::{MutexUserAwareness, UserAwareness};
  6. use flowy_error::{ErrorCode, FlowyError, FlowyResult};
  7. use crate::entities::ReminderPB;
  8. use crate::manager::UserManager;
  9. use crate::services::entities::Session;
  10. impl UserManager {
  11. /// Adds a new reminder based on the given payload.
  12. ///
  13. /// This function creates a new `Reminder` from the provided payload and adds it to the user's reminders.
  14. /// It leverages the `with_awareness` function to ensure the reminder is added in the context of the
  15. /// current user's awareness.
  16. ///
  17. /// # Parameters
  18. /// - `reminder_pb`: The pb for the new reminder.
  19. ///
  20. /// # Returns
  21. /// - Returns `Ok(())` if the reminder is successfully added.
  22. /// - May return errors of type `FlowyError` if any issues arise during the process.
  23. ///
  24. pub async fn add_reminder(&self, reminder_pb: ReminderPB) -> FlowyResult<()> {
  25. let reminder = Reminder::from(reminder_pb);
  26. self
  27. .with_awareness((), |user_awareness| {
  28. user_awareness.add_reminder(reminder);
  29. })
  30. .await;
  31. Ok(())
  32. }
  33. /// Retrieves all reminders for the user.
  34. ///
  35. /// This function fetches all reminders associated with the current user. It leverages the
  36. /// `with_awareness` function to ensure the reminders are retrieved in the context of the
  37. /// current user's awareness.
  38. ///
  39. /// # Returns
  40. /// - Returns a vector of `Reminder` objects containing all reminders for the user.
  41. ///
  42. pub async fn get_all_reminders(&self) -> Vec<Reminder> {
  43. self
  44. .with_awareness(vec![], |user_awareness| user_awareness.get_all_reminders())
  45. .await
  46. }
  47. pub async fn initialize_user_awareness(
  48. &self,
  49. session: &Session,
  50. source: UserAwarenessDataSource,
  51. ) {
  52. match self.try_initial_user_awareness(session, source).await {
  53. Ok(_) => {
  54. tracing::trace!("User awareness initialized");
  55. },
  56. Err(e) => {
  57. tracing::error!("Failed to initialize user awareness: {:?}", e);
  58. },
  59. }
  60. }
  61. /// Initializes the user's awareness based on the specified data source.
  62. ///
  63. /// This asynchronous function attempts to initialize the user's awareness from either a local or remote data source.
  64. /// Depending on the chosen source, it will either construct the user awareness from an empty dataset or fetch it
  65. /// from a remote service. Once obtained, the user's awareness is stored in a shared mutex-protected structure.
  66. ///
  67. /// # Parameters
  68. /// - `session`: The current user's session data.
  69. /// - `source`: The source from which the user's awareness data should be obtained, either local or remote.
  70. ///
  71. /// # Returns
  72. /// - Returns `Ok(())` if the user's awareness is successfully initialized.
  73. /// - May return errors of type `FlowyError` if any issues arise during the initialization.
  74. async fn try_initial_user_awareness(
  75. &self,
  76. session: &Session,
  77. source: UserAwarenessDataSource,
  78. ) -> FlowyResult<()> {
  79. tracing::trace!("Initializing user awareness from {:?}", source);
  80. let collab_db = self.get_collab_db(session.user_id)?;
  81. let user_awareness = match source {
  82. UserAwarenessDataSource::Local => {
  83. let collab = self.collab_for_user_awareness(session, collab_db, vec![])?;
  84. MutexUserAwareness::new(UserAwareness::create(collab, None))
  85. },
  86. UserAwarenessDataSource::Remote => {
  87. let data = self
  88. .cloud_services
  89. .get_user_service()?
  90. .get_user_awareness_updates(session.user_id)
  91. .await?;
  92. let collab = self.collab_for_user_awareness(session, collab_db, data)?;
  93. MutexUserAwareness::new(UserAwareness::create(collab, None))
  94. },
  95. };
  96. self.user_awareness.lock().await.replace(user_awareness);
  97. Ok(())
  98. }
  99. /// Creates a collaboration instance tailored for user awareness.
  100. ///
  101. /// This function constructs a collaboration instance based on the given session and raw data,
  102. /// using a collaboration builder. This instance is specifically geared towards handling
  103. /// user awareness.
  104. fn collab_for_user_awareness(
  105. &self,
  106. session: &Session,
  107. collab_db: Weak<RocksCollabDB>,
  108. raw_data: CollabRawData,
  109. ) -> Result<Arc<MutexCollab>, FlowyError> {
  110. let collab_builder = self.collab_builder.upgrade().ok_or(FlowyError::new(
  111. ErrorCode::Internal,
  112. "Unexpected error: collab builder is not available",
  113. ))?;
  114. let collab = collab_builder.build(
  115. session.user_id,
  116. &session.user_id.to_string(),
  117. CollabType::UserAwareness,
  118. raw_data,
  119. collab_db,
  120. )?;
  121. Ok(collab)
  122. }
  123. /// Executes a function with user awareness.
  124. ///
  125. /// This function takes an asynchronous closure `f` that accepts a reference to a `UserAwareness`
  126. /// and returns an `Output`. If the current user awareness is set (i.e., is `Some`), it invokes
  127. /// the closure `f` with the user awareness. If the user awareness is not set (i.e., is `None`),
  128. /// it attempts to initialize the user awareness via a remote session. If the session fetch
  129. /// or user awareness initialization fails, it returns the provided `default_value`.
  130. ///
  131. /// # Parameters
  132. /// - `default_value`: A default value to return if the user awareness is `None` and cannot be initialized.
  133. /// - `f`: The asynchronous closure to execute with the user awareness.
  134. async fn with_awareness<F, Output>(&self, default_value: Output, f: F) -> Output
  135. where
  136. F: FnOnce(&UserAwareness) -> Output,
  137. {
  138. let user_awareness = self.user_awareness.lock().await;
  139. match &*user_awareness {
  140. None => {
  141. if let Ok(session) = self.get_session() {
  142. self
  143. .initialize_user_awareness(&session, UserAwarenessDataSource::Remote)
  144. .await;
  145. }
  146. default_value
  147. },
  148. Some(user_awareness) => f(&user_awareness.lock()),
  149. }
  150. }
  151. }
  152. /// Indicate using which data source to initialize the user awareness
  153. /// If the user is not a new user, the local data source is used. Otherwise, the remote data source is used.
  154. /// When using the remote data source, the user awareness will be initialized from the remote server.
  155. #[derive(Debug)]
  156. pub enum UserAwarenessDataSource {
  157. Local,
  158. Remote,
  159. }