folder_manager.rs 11 KB


  1. use crate::entities::revision::{RepeatedRevision, Revision};
  2. use crate::server_folder::folder_pad::{FolderOperations, FolderRevisionSynchronizer};
  3. use crate::{
  4. entities::{folder::FolderInfo, ws_data::ServerRevisionWSDataBuilder},
  5. errors::{internal_error, CollaborateError, CollaborateResult},
  6. protobuf::ClientRevisionWSData,
  7. server_folder::folder_pad::ServerFolder,
  8. synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionUser},
  9. util::rev_id_from_str,
  10. };
  11. use async_stream::stream;
  12. use futures::stream::StreamExt;
  13. use lib_infra::future::BoxResultFuture;
  14. use std::{collections::HashMap, fmt::Debug, sync::Arc};
  15. use tokio::{
  16. sync::{mpsc, oneshot, RwLock},
  17. task::spawn_blocking,
  18. };
  19. pub trait FolderCloudPersistence: Send + Sync + Debug {
  20. fn read_folder(&self, user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError>;
  21. fn create_folder(
  22. &self,
  23. user_id: &str,
  24. folder_id: &str,
  25. repeated_revision: RepeatedRevision,
  26. ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>;
  27. fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
  28. fn read_folder_revisions(
  29. &self,
  30. folder_id: &str,
  31. rev_ids: Option<Vec<i64>>,
  32. ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
  33. fn reset_folder(
  34. &self,
  35. folder_id: &str,
  36. repeated_revision: RepeatedRevision,
  37. ) -> BoxResultFuture<(), CollaborateError>;
  38. }
  39. impl RevisionSyncPersistence for Arc<dyn FolderCloudPersistence> {
  40. fn read_revisions(
  41. &self,
  42. object_id: &str,
  43. rev_ids: Option<Vec<i64>>,
  44. ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  45. (**self).read_folder_revisions(object_id, rev_ids)
  46. }
  47. fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
  48. (**self).save_folder_revisions(repeated_revision)
  49. }
  50. fn reset_object(
  51. &self,
  52. object_id: &str,
  53. repeated_revision: RepeatedRevision,
  54. ) -> BoxResultFuture<(), CollaborateError> {
  55. (**self).reset_folder(object_id, repeated_revision)
  56. }
  57. }
  58. pub struct ServerFolderManager {
  59. folder_handlers: Arc<RwLock<HashMap<String, Arc<OpenFolderHandler>>>>,
  60. persistence: Arc<dyn FolderCloudPersistence>,
  61. }
  62. impl ServerFolderManager {
  63. pub fn new(persistence: Arc<dyn FolderCloudPersistence>) -> Self {
  64. Self {
  65. folder_handlers: Arc::new(RwLock::new(HashMap::new())),
  66. persistence,
  67. }
  68. }
  69. pub async fn handle_client_revisions(
  70. &self,
  71. user: Arc<dyn RevisionUser>,
  72. mut client_data: ClientRevisionWSData,
  73. ) -> Result<(), CollaborateError> {
  74. let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
  75. let cloned_user = user.clone();
  76. let ack_id = rev_id_from_str(&client_data.data_id)?;
  77. let folder_id = client_data.object_id;
  78. let user_id = user.user_id();
  79. let result = match self.get_folder_handler(&user_id, &folder_id).await {
  80. None => {
  81. let _ = self
  82. .create_folder(&user_id, &folder_id, repeated_revision)
  83. .await
  84. .map_err(|e| CollaborateError::internal().context(format!("Server create folder failed: {}", e)))?;
  85. Ok(())
  86. }
  87. Some(handler) => {
  88. let _ = handler.apply_revisions(user, repeated_revision).await?;
  89. Ok(())
  90. }
  91. };
  92. if result.is_ok() {
  93. cloned_user.receive(RevisionSyncResponse::Ack(
  94. ServerRevisionWSDataBuilder::build_ack_message(&folder_id, ack_id),
  95. ));
  96. }
  97. result
  98. }
  99. pub async fn handle_client_ping(
  100. &self,
  101. user: Arc<dyn RevisionUser>,
  102. client_data: ClientRevisionWSData,
  103. ) -> Result<(), CollaborateError> {
  104. let user_id = user.user_id();
  105. let rev_id = rev_id_from_str(&client_data.data_id)?;
  106. let folder_id = client_data.object_id.clone();
  107. match self.get_folder_handler(&user_id, &folder_id).await {
  108. None => {
  109. tracing::trace!("Folder:{} doesn't exist, ignore client ping", folder_id);
  110. Ok(())
  111. }
  112. Some(handler) => {
  113. let _ = handler.apply_ping(rev_id, user).await?;
  114. Ok(())
  115. }
  116. }
  117. }
  118. async fn get_folder_handler(&self, user_id: &str, folder_id: &str) -> Option<Arc<OpenFolderHandler>> {
  119. let folder_id = folder_id.to_owned();
  120. if let Some(handler) = self.folder_handlers.read().await.get(&folder_id).cloned() {
  121. return Some(handler);
  122. }
  123. let mut write_guard = self.folder_handlers.write().await;
  124. match self.persistence.read_folder(user_id, &folder_id).await {
  125. Ok(folder_info) => {
  126. let handler = self
  127. .create_folder_handler(folder_info)
  128. .await
  129. .map_err(internal_error)
  130. .unwrap();
  131. write_guard.insert(folder_id, handler.clone());
  132. drop(write_guard);
  133. Some(handler)
  134. }
  135. Err(_) => None,
  136. }
  137. }
  138. async fn create_folder_handler(&self, folder_info: FolderInfo) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  139. let persistence = self.persistence.clone();
  140. let handle = spawn_blocking(|| OpenFolderHandler::new(folder_info, persistence))
  141. .await
  142. .map_err(|e| CollaborateError::internal().context(format!("Create folder handler failed: {}", e)))?;
  143. Ok(Arc::new(handle?))
  144. }
  145. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  146. async fn create_folder(
  147. &self,
  148. user_id: &str,
  149. folder_id: &str,
  150. repeated_revision: RepeatedRevision,
  151. ) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  152. match self
  153. .persistence
  154. .create_folder(user_id, folder_id, repeated_revision)
  155. .await?
  156. {
  157. Some(folder_info) => {
  158. let handler = self.create_folder_handler(folder_info).await?;
  159. self.folder_handlers
  160. .write()
  161. .await
  162. .insert(folder_id.to_owned(), handler.clone());
  163. Ok(handler)
  164. }
  165. None => Err(CollaborateError::internal().context(String::new())),
  166. }
  167. }
  168. }
  169. struct OpenFolderHandler {
  170. folder_id: String,
  171. sender: mpsc::Sender<FolderCommand>,
  172. }
  173. impl OpenFolderHandler {
  174. fn new(folder_info: FolderInfo, persistence: Arc<dyn FolderCloudPersistence>) -> CollaborateResult<Self> {
  175. let (sender, receiver) = mpsc::channel(1000);
  176. let folder_id = folder_info.folder_id.clone();
  177. let operations = FolderOperations::from_bytes(&folder_info.text)?;
  178. let sync_object = ServerFolder::from_operations(&folder_id, operations);
  179. let synchronizer = Arc::new(FolderRevisionSynchronizer::new(
  180. folder_info.rev_id,
  181. sync_object,
  182. persistence,
  183. ));
  184. let queue = FolderCommandRunner::new(&folder_id, receiver, synchronizer);
  185. tokio::task::spawn(queue.run());
  186. Ok(Self { folder_id, sender })
  187. }
  188. #[tracing::instrument(
  189. name = "server_folder_apply_revision",
  190. level = "trace",
  191. skip(self, user, repeated_revision),
  192. err
  193. )]
  194. async fn apply_revisions(
  195. &self,
  196. user: Arc<dyn RevisionUser>,
  197. repeated_revision: RepeatedRevision,
  198. ) -> CollaborateResult<()> {
  199. let (ret, rx) = oneshot::channel();
  200. let msg = FolderCommand::ApplyRevisions {
  201. user,
  202. repeated_revision,
  203. ret,
  204. };
  205. self.send(msg, rx).await?
  206. }
  207. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
  208. let (ret, rx) = oneshot::channel();
  209. let msg = FolderCommand::Ping { user, rev_id, ret };
  210. self.send(msg, rx).await?
  211. }
  212. async fn send<T>(&self, msg: FolderCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
  213. let _ = self
  214. .sender
  215. .send(msg)
  216. .await
  217. .map_err(|e| CollaborateError::internal().context(format!("Send folder command failed: {}", e)))?;
  218. rx.await.map_err(internal_error)
  219. }
  220. }
  221. impl std::ops::Drop for OpenFolderHandler {
  222. fn drop(&mut self) {
  223. tracing::trace!("{} OpenFolderHandler was dropped", self.folder_id);
  224. }
  225. }
  226. enum FolderCommand {
  227. ApplyRevisions {
  228. user: Arc<dyn RevisionUser>,
  229. repeated_revision: RepeatedRevision,
  230. ret: oneshot::Sender<CollaborateResult<()>>,
  231. },
  232. Ping {
  233. user: Arc<dyn RevisionUser>,
  234. rev_id: i64,
  235. ret: oneshot::Sender<CollaborateResult<()>>,
  236. },
  237. }
  238. struct FolderCommandRunner {
  239. folder_id: String,
  240. receiver: Option<mpsc::Receiver<FolderCommand>>,
  241. synchronizer: Arc<FolderRevisionSynchronizer>,
  242. }
  243. impl FolderCommandRunner {
  244. fn new(
  245. folder_id: &str,
  246. receiver: mpsc::Receiver<FolderCommand>,
  247. synchronizer: Arc<FolderRevisionSynchronizer>,
  248. ) -> Self {
  249. Self {
  250. folder_id: folder_id.to_owned(),
  251. receiver: Some(receiver),
  252. synchronizer,
  253. }
  254. }
  255. async fn run(mut self) {
  256. let mut receiver = self
  257. .receiver
  258. .take()
  259. .expect("FolderCommandRunner's receiver should only take one time");
  260. let stream = stream! {
  261. loop {
  262. match receiver.recv().await {
  263. Some(msg) => yield msg,
  264. None => break,
  265. }
  266. }
  267. };
  268. stream.for_each(|msg| self.handle_message(msg)).await;
  269. }
  270. async fn handle_message(&self, msg: FolderCommand) {
  271. match msg {
  272. FolderCommand::ApplyRevisions {
  273. user,
  274. repeated_revision,
  275. ret,
  276. } => {
  277. let result = self
  278. .synchronizer
  279. .sync_revisions(user, repeated_revision)
  280. .await
  281. .map_err(internal_error);
  282. let _ = ret.send(result);
  283. }
  284. FolderCommand::Ping { user, rev_id, ret } => {
  285. let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
  286. let _ = ret.send(result);
  287. }
  288. }
  289. }
  290. }
  291. impl std::ops::Drop for FolderCommandRunner {
  292. fn drop(&mut self) {
  293. tracing::trace!("{} FolderCommandRunner was dropped", self.folder_id);
  294. }
  295. }