folder_manager.rs 11 KB


  1. use crate::entities::revision::{RepeatedRevision, Revision};
  2. use crate::{
  3. entities::{
  4. folder::{FolderDelta, FolderInfo},
  5. ws_data::ServerRevisionWSDataBuilder,
  6. },
  7. errors::{internal_error, CollaborateError, CollaborateResult},
  8. protobuf::ClientRevisionWSData,
  9. server_folder::folder_pad::ServerFolder,
  10. synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
  11. util::rev_id_from_str,
  12. };
  13. use async_stream::stream;
  14. use futures::stream::StreamExt;
  15. use lib_infra::future::BoxResultFuture;
  16. use lib_ot::core::PhantomAttributes;
  17. use std::{collections::HashMap, fmt::Debug, sync::Arc};
  18. use tokio::{
  19. sync::{mpsc, oneshot, RwLock},
  20. task::spawn_blocking,
  21. };
  22. pub trait FolderCloudPersistence: Send + Sync + Debug {
  23. fn read_folder(&self, user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError>;
  24. fn create_folder(
  25. &self,
  26. user_id: &str,
  27. folder_id: &str,
  28. repeated_revision: RepeatedRevision,
  29. ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>;
  30. fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
  31. fn read_folder_revisions(
  32. &self,
  33. folder_id: &str,
  34. rev_ids: Option<Vec<i64>>,
  35. ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
  36. fn reset_folder(
  37. &self,
  38. folder_id: &str,
  39. repeated_revision: RepeatedRevision,
  40. ) -> BoxResultFuture<(), CollaborateError>;
  41. }
  42. impl RevisionSyncPersistence for Arc<dyn FolderCloudPersistence> {
  43. fn read_revisions(
  44. &self,
  45. object_id: &str,
  46. rev_ids: Option<Vec<i64>>,
  47. ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  48. (**self).read_folder_revisions(object_id, rev_ids)
  49. }
  50. fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
  51. (**self).save_folder_revisions(repeated_revision)
  52. }
  53. fn reset_object(
  54. &self,
  55. object_id: &str,
  56. repeated_revision: RepeatedRevision,
  57. ) -> BoxResultFuture<(), CollaborateError> {
  58. (**self).reset_folder(object_id, repeated_revision)
  59. }
  60. }
  61. pub struct ServerFolderManager {
  62. folder_handlers: Arc<RwLock<HashMap<String, Arc<OpenFolderHandler>>>>,
  63. persistence: Arc<dyn FolderCloudPersistence>,
  64. }
  65. impl ServerFolderManager {
  66. pub fn new(persistence: Arc<dyn FolderCloudPersistence>) -> Self {
  67. Self {
  68. folder_handlers: Arc::new(RwLock::new(HashMap::new())),
  69. persistence,
  70. }
  71. }
  72. pub async fn handle_client_revisions(
  73. &self,
  74. user: Arc<dyn RevisionUser>,
  75. mut client_data: ClientRevisionWSData,
  76. ) -> Result<(), CollaborateError> {
  77. let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
  78. let cloned_user = user.clone();
  79. let ack_id = rev_id_from_str(&client_data.data_id)?;
  80. let folder_id = client_data.object_id;
  81. let user_id = user.user_id();
  82. let result = match self.get_folder_handler(&user_id, &folder_id).await {
  83. None => {
  84. let _ = self
  85. .create_folder(&user_id, &folder_id, repeated_revision)
  86. .await
  87. .map_err(|e| CollaborateError::internal().context(format!("Server create folder failed: {}", e)))?;
  88. Ok(())
  89. }
  90. Some(handler) => {
  91. let _ = handler.apply_revisions(user, repeated_revision).await?;
  92. Ok(())
  93. }
  94. };
  95. if result.is_ok() {
  96. cloned_user.receive(RevisionSyncResponse::Ack(
  97. ServerRevisionWSDataBuilder::build_ack_message(&folder_id, ack_id),
  98. ));
  99. }
  100. result
  101. }
  102. pub async fn handle_client_ping(
  103. &self,
  104. user: Arc<dyn RevisionUser>,
  105. client_data: ClientRevisionWSData,
  106. ) -> Result<(), CollaborateError> {
  107. let user_id = user.user_id();
  108. let rev_id = rev_id_from_str(&client_data.data_id)?;
  109. let folder_id = client_data.object_id.clone();
  110. match self.get_folder_handler(&user_id, &folder_id).await {
  111. None => {
  112. tracing::trace!("Folder:{} doesn't exist, ignore client ping", folder_id);
  113. Ok(())
  114. }
  115. Some(handler) => {
  116. let _ = handler.apply_ping(rev_id, user).await?;
  117. Ok(())
  118. }
  119. }
  120. }
  121. async fn get_folder_handler(&self, user_id: &str, folder_id: &str) -> Option<Arc<OpenFolderHandler>> {
  122. let folder_id = folder_id.to_owned();
  123. if let Some(handler) = self.folder_handlers.read().await.get(&folder_id).cloned() {
  124. return Some(handler);
  125. }
  126. let mut write_guard = self.folder_handlers.write().await;
  127. match self.persistence.read_folder(user_id, &folder_id).await {
  128. Ok(folder_info) => {
  129. let handler = self
  130. .create_folder_handler(folder_info)
  131. .await
  132. .map_err(internal_error)
  133. .unwrap();
  134. write_guard.insert(folder_id, handler.clone());
  135. drop(write_guard);
  136. Some(handler)
  137. }
  138. Err(_) => None,
  139. }
  140. }
  141. async fn create_folder_handler(&self, folder_info: FolderInfo) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  142. let persistence = self.persistence.clone();
  143. let handle = spawn_blocking(|| OpenFolderHandler::new(folder_info, persistence))
  144. .await
  145. .map_err(|e| CollaborateError::internal().context(format!("Create folder handler failed: {}", e)))?;
  146. Ok(Arc::new(handle?))
  147. }
  148. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  149. async fn create_folder(
  150. &self,
  151. user_id: &str,
  152. folder_id: &str,
  153. repeated_revision: RepeatedRevision,
  154. ) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  155. match self
  156. .persistence
  157. .create_folder(user_id, folder_id, repeated_revision)
  158. .await?
  159. {
  160. Some(folder_info) => {
  161. let handler = self.create_folder_handler(folder_info).await?;
  162. self.folder_handlers
  163. .write()
  164. .await
  165. .insert(folder_id.to_owned(), handler.clone());
  166. Ok(handler)
  167. }
  168. None => Err(CollaborateError::internal().context(String::new())),
  169. }
  170. }
  171. }
  172. type FolderRevisionSynchronizer = RevisionSynchronizer<PhantomAttributes>;
  173. struct OpenFolderHandler {
  174. folder_id: String,
  175. sender: mpsc::Sender<FolderCommand>,
  176. }
  177. impl OpenFolderHandler {
  178. fn new(folder_info: FolderInfo, persistence: Arc<dyn FolderCloudPersistence>) -> CollaborateResult<Self> {
  179. let (sender, receiver) = mpsc::channel(1000);
  180. let folder_id = folder_info.folder_id.clone();
  181. let delta = FolderDelta::from_bytes(&folder_info.text)?;
  182. let sync_object = ServerFolder::from_delta(&folder_id, delta);
  183. let synchronizer = Arc::new(FolderRevisionSynchronizer::new(
  184. folder_info.rev_id,
  185. sync_object,
  186. persistence,
  187. ));
  188. let queue = FolderCommandRunner::new(&folder_id, receiver, synchronizer);
  189. tokio::task::spawn(queue.run());
  190. Ok(Self { folder_id, sender })
  191. }
  192. #[tracing::instrument(
  193. name = "server_folder_apply_revision",
  194. level = "trace",
  195. skip(self, user, repeated_revision),
  196. err
  197. )]
  198. async fn apply_revisions(
  199. &self,
  200. user: Arc<dyn RevisionUser>,
  201. repeated_revision: RepeatedRevision,
  202. ) -> CollaborateResult<()> {
  203. let (ret, rx) = oneshot::channel();
  204. let msg = FolderCommand::ApplyRevisions {
  205. user,
  206. repeated_revision,
  207. ret,
  208. };
  209. self.send(msg, rx).await?
  210. }
  211. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
  212. let (ret, rx) = oneshot::channel();
  213. let msg = FolderCommand::Ping { user, rev_id, ret };
  214. self.send(msg, rx).await?
  215. }
  216. async fn send<T>(&self, msg: FolderCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
  217. let _ = self
  218. .sender
  219. .send(msg)
  220. .await
  221. .map_err(|e| CollaborateError::internal().context(format!("Send folder command failed: {}", e)))?;
  222. Ok(rx.await.map_err(internal_error)?)
  223. }
  224. }
  225. impl std::ops::Drop for OpenFolderHandler {
  226. fn drop(&mut self) {
  227. tracing::trace!("{} OpenFolderHandler was dropped", self.folder_id);
  228. }
  229. }
  230. enum FolderCommand {
  231. ApplyRevisions {
  232. user: Arc<dyn RevisionUser>,
  233. repeated_revision: RepeatedRevision,
  234. ret: oneshot::Sender<CollaborateResult<()>>,
  235. },
  236. Ping {
  237. user: Arc<dyn RevisionUser>,
  238. rev_id: i64,
  239. ret: oneshot::Sender<CollaborateResult<()>>,
  240. },
  241. }
  242. struct FolderCommandRunner {
  243. folder_id: String,
  244. receiver: Option<mpsc::Receiver<FolderCommand>>,
  245. synchronizer: Arc<FolderRevisionSynchronizer>,
  246. }
  247. impl FolderCommandRunner {
  248. fn new(
  249. folder_id: &str,
  250. receiver: mpsc::Receiver<FolderCommand>,
  251. synchronizer: Arc<FolderRevisionSynchronizer>,
  252. ) -> Self {
  253. Self {
  254. folder_id: folder_id.to_owned(),
  255. receiver: Some(receiver),
  256. synchronizer,
  257. }
  258. }
  259. async fn run(mut self) {
  260. let mut receiver = self
  261. .receiver
  262. .take()
  263. .expect("FolderCommandRunner's receiver should only take one time");
  264. let stream = stream! {
  265. loop {
  266. match receiver.recv().await {
  267. Some(msg) => yield msg,
  268. None => break,
  269. }
  270. }
  271. };
  272. stream.for_each(|msg| self.handle_message(msg)).await;
  273. }
  274. async fn handle_message(&self, msg: FolderCommand) {
  275. match msg {
  276. FolderCommand::ApplyRevisions {
  277. user,
  278. repeated_revision,
  279. ret,
  280. } => {
  281. let result = self
  282. .synchronizer
  283. .sync_revisions(user, repeated_revision)
  284. .await
  285. .map_err(internal_error);
  286. let _ = ret.send(result);
  287. }
  288. FolderCommand::Ping { user, rev_id, ret } => {
  289. let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
  290. let _ = ret.send(result);
  291. }
  292. }
  293. }
  294. }
  295. impl std::ops::Drop for FolderCommandRunner {
  296. fn drop(&mut self) {
  297. tracing::trace!("{} FolderCommandRunner was dropped", self.folder_id);
  298. }
  299. }