folder_manager.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. use crate::server_folder::folder_pad::{FolderOperations, FolderRevisionSynchronizer};
  2. use crate::server_folder::ServerFolder;
  3. use async_stream::stream;
  4. use flowy_sync::errors::{internal_sync_error, SyncError, SyncResult};
  5. use flowy_sync::ext::FolderCloudPersistence;
  6. use flowy_sync::{RevisionSyncResponse, RevisionUser};
  7. use folder_model::FolderInfo;
  8. use futures::stream::StreamExt;
  9. use revision_model::Revision;
  10. use std::{collections::HashMap, sync::Arc};
  11. use tokio::{
  12. sync::{mpsc, oneshot, RwLock},
  13. task::spawn_blocking,
  14. };
  15. use ws_model::ws_revision::{ClientRevisionWSData, ServerRevisionWSDataBuilder};
  16. pub struct ServerFolderManager {
  17. folder_handlers: Arc<RwLock<HashMap<String, Arc<OpenFolderHandler>>>>,
  18. persistence: Arc<dyn FolderCloudPersistence>,
  19. }
  20. impl ServerFolderManager {
  21. pub fn new(persistence: Arc<dyn FolderCloudPersistence>) -> Self {
  22. Self {
  23. folder_handlers: Arc::new(RwLock::new(HashMap::new())),
  24. persistence,
  25. }
  26. }
  27. pub async fn handle_client_revisions(
  28. &self,
  29. user: Arc<dyn RevisionUser>,
  30. client_data: ClientRevisionWSData,
  31. ) -> Result<(), SyncError> {
  32. let cloned_user = user.clone();
  33. let ack_id = client_data.rev_id;
  34. let folder_id = client_data.object_id;
  35. let user_id = user.user_id();
  36. let result = match self.get_folder_handler(&user_id, &folder_id).await {
  37. None => {
  38. let _ = self
  39. .create_folder(&user_id, &folder_id, client_data.revisions)
  40. .await
  41. .map_err(|e| {
  42. SyncError::internal().context(format!("Server create folder failed: {:?}", e))
  43. })?;
  44. Ok(())
  45. },
  46. Some(handler) => {
  47. handler.apply_revisions(user, client_data.revisions).await?;
  48. Ok(())
  49. },
  50. };
  51. if result.is_ok() {
  52. cloned_user.receive(RevisionSyncResponse::Ack(
  53. ServerRevisionWSDataBuilder::build_ack_message(&folder_id, ack_id),
  54. ));
  55. }
  56. result
  57. }
  58. pub async fn handle_client_ping(
  59. &self,
  60. user: Arc<dyn RevisionUser>,
  61. client_data: ClientRevisionWSData,
  62. ) -> Result<(), SyncError> {
  63. let user_id = user.user_id();
  64. let rev_id = client_data.rev_id;
  65. let folder_id = client_data.object_id.clone();
  66. match self.get_folder_handler(&user_id, &folder_id).await {
  67. None => {
  68. tracing::trace!("Folder:{} doesn't exist, ignore client ping", folder_id);
  69. Ok(())
  70. },
  71. Some(handler) => {
  72. handler.apply_ping(rev_id, user).await?;
  73. Ok(())
  74. },
  75. }
  76. }
  77. async fn get_folder_handler(
  78. &self,
  79. user_id: &str,
  80. folder_id: &str,
  81. ) -> Option<Arc<OpenFolderHandler>> {
  82. let folder_id = folder_id.to_owned();
  83. if let Some(handler) = self.folder_handlers.read().await.get(&folder_id).cloned() {
  84. return Some(handler);
  85. }
  86. let mut write_guard = self.folder_handlers.write().await;
  87. match self.persistence.read_folder(user_id, &folder_id).await {
  88. Ok(folder_info) => {
  89. let handler = self
  90. .create_folder_handler(folder_info)
  91. .await
  92. .map_err(internal_sync_error)
  93. .unwrap();
  94. write_guard.insert(folder_id, handler.clone());
  95. drop(write_guard);
  96. Some(handler)
  97. },
  98. Err(_) => None,
  99. }
  100. }
  101. async fn create_folder_handler(
  102. &self,
  103. folder_info: FolderInfo,
  104. ) -> Result<Arc<OpenFolderHandler>, SyncError> {
  105. let persistence = self.persistence.clone();
  106. let handle = spawn_blocking(|| OpenFolderHandler::new(folder_info, persistence))
  107. .await
  108. .map_err(|e| SyncError::internal().context(format!("Create folder handler failed: {}", e)))?;
  109. Ok(Arc::new(handle?))
  110. }
  111. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  112. async fn create_folder(
  113. &self,
  114. user_id: &str,
  115. folder_id: &str,
  116. revisions: Vec<Revision>,
  117. ) -> Result<Arc<OpenFolderHandler>, SyncError> {
  118. match self
  119. .persistence
  120. .create_folder(user_id, folder_id, revisions)
  121. .await?
  122. {
  123. Some(folder_info) => {
  124. let handler = self.create_folder_handler(folder_info).await?;
  125. self
  126. .folder_handlers
  127. .write()
  128. .await
  129. .insert(folder_id.to_owned(), handler.clone());
  130. Ok(handler)
  131. },
  132. None => Err(SyncError::internal().context(String::new())),
  133. }
  134. }
  135. }
  136. struct OpenFolderHandler {
  137. folder_id: String,
  138. sender: mpsc::Sender<FolderCommand>,
  139. }
  140. impl OpenFolderHandler {
  141. fn new(
  142. folder_info: FolderInfo,
  143. persistence: Arc<dyn FolderCloudPersistence>,
  144. ) -> SyncResult<Self> {
  145. let (sender, receiver) = mpsc::channel(1000);
  146. let folder_id = folder_info.folder_id.clone();
  147. let operations = FolderOperations::from_bytes(&folder_info.text)?;
  148. let sync_object = ServerFolder::from_operations(&folder_id, operations);
  149. let synchronizer = Arc::new(FolderRevisionSynchronizer::new(
  150. folder_info.rev_id,
  151. sync_object,
  152. persistence,
  153. ));
  154. let queue = FolderCommandRunner::new(&folder_id, receiver, synchronizer);
  155. tokio::task::spawn(queue.run());
  156. Ok(Self { folder_id, sender })
  157. }
  158. #[tracing::instrument(
  159. name = "server_folder_apply_revision",
  160. level = "trace",
  161. skip(self, user, revisions),
  162. err
  163. )]
  164. async fn apply_revisions(
  165. &self,
  166. user: Arc<dyn RevisionUser>,
  167. revisions: Vec<Revision>,
  168. ) -> SyncResult<()> {
  169. let (ret, rx) = oneshot::channel();
  170. let msg = FolderCommand::ApplyRevisions {
  171. user,
  172. revisions,
  173. ret,
  174. };
  175. self.send(msg, rx).await?
  176. }
  177. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), SyncError> {
  178. let (ret, rx) = oneshot::channel();
  179. let msg = FolderCommand::Ping { user, rev_id, ret };
  180. self.send(msg, rx).await?
  181. }
  182. async fn send<T>(&self, msg: FolderCommand, rx: oneshot::Receiver<T>) -> SyncResult<T> {
  183. self
  184. .sender
  185. .send(msg)
  186. .await
  187. .map_err(|e| SyncError::internal().context(format!("Send folder command failed: {}", e)))?;
  188. rx.await.map_err(internal_sync_error)
  189. }
  190. }
  191. impl std::ops::Drop for OpenFolderHandler {
  192. fn drop(&mut self) {
  193. tracing::trace!("{} OpenFolderHandler was dropped", self.folder_id);
  194. }
  195. }
  196. enum FolderCommand {
  197. ApplyRevisions {
  198. user: Arc<dyn RevisionUser>,
  199. revisions: Vec<Revision>,
  200. ret: oneshot::Sender<SyncResult<()>>,
  201. },
  202. Ping {
  203. user: Arc<dyn RevisionUser>,
  204. rev_id: i64,
  205. ret: oneshot::Sender<SyncResult<()>>,
  206. },
  207. }
  208. struct FolderCommandRunner {
  209. folder_id: String,
  210. receiver: Option<mpsc::Receiver<FolderCommand>>,
  211. synchronizer: Arc<FolderRevisionSynchronizer>,
  212. }
  213. impl FolderCommandRunner {
  214. fn new(
  215. folder_id: &str,
  216. receiver: mpsc::Receiver<FolderCommand>,
  217. synchronizer: Arc<FolderRevisionSynchronizer>,
  218. ) -> Self {
  219. Self {
  220. folder_id: folder_id.to_owned(),
  221. receiver: Some(receiver),
  222. synchronizer,
  223. }
  224. }
  225. async fn run(mut self) {
  226. let mut receiver = self
  227. .receiver
  228. .take()
  229. .expect("FolderCommandRunner's receiver should only take one time");
  230. let stream = stream! {
  231. while let Some(msg) = receiver.recv().await {
  232. yield msg;
  233. }
  234. };
  235. stream.for_each(|msg| self.handle_message(msg)).await;
  236. }
  237. async fn handle_message(&self, msg: FolderCommand) {
  238. match msg {
  239. FolderCommand::ApplyRevisions {
  240. user,
  241. revisions,
  242. ret,
  243. } => {
  244. let result = self
  245. .synchronizer
  246. .sync_revisions(user, revisions)
  247. .await
  248. .map_err(internal_sync_error);
  249. let _ = ret.send(result);
  250. },
  251. FolderCommand::Ping { user, rev_id, ret } => {
  252. let result = self
  253. .synchronizer
  254. .pong(user, rev_id)
  255. .await
  256. .map_err(internal_sync_error);
  257. let _ = ret.send(result);
  258. },
  259. }
  260. }
  261. }
  262. impl std::ops::Drop for FolderCommandRunner {
  263. fn drop(&mut self) {
  264. tracing::trace!("{} FolderCommandRunner was dropped", self.folder_id);
  265. }
  266. }