folder_manager.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. use crate::{
  2. entities::{
  3. folder_info::{FolderDelta, FolderInfo},
  4. ws_data::ServerRevisionWSDataBuilder,
  5. },
  6. errors::{internal_error, CollaborateError, CollaborateResult},
  7. protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
  8. server_folder::folder_pad::ServerFolder,
  9. synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
  10. util::rev_id_from_str,
  11. };
  12. use async_stream::stream;
  13. use futures::stream::StreamExt;
  14. use lib_infra::future::BoxResultFuture;
  15. use lib_ot::core::PlainAttributes;
  16. use std::{collections::HashMap, fmt::Debug, sync::Arc};
  17. use tokio::{
  18. sync::{mpsc, oneshot, RwLock},
  19. task::spawn_blocking,
  20. };
  21. pub trait FolderCloudPersistence: Send + Sync + Debug {
  22. fn read_folder(&self, user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError>;
  23. fn create_folder(
  24. &self,
  25. user_id: &str,
  26. folder_id: &str,
  27. repeated_revision: RepeatedRevisionPB,
  28. ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>;
  29. fn save_folder_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
  30. fn read_folder_revisions(
  31. &self,
  32. folder_id: &str,
  33. rev_ids: Option<Vec<i64>>,
  34. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
  35. fn reset_folder(
  36. &self,
  37. folder_id: &str,
  38. repeated_revision: RepeatedRevisionPB,
  39. ) -> BoxResultFuture<(), CollaborateError>;
  40. }
  41. impl RevisionSyncPersistence for Arc<dyn FolderCloudPersistence> {
  42. fn read_revisions(
  43. &self,
  44. object_id: &str,
  45. rev_ids: Option<Vec<i64>>,
  46. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  47. (**self).read_folder_revisions(object_id, rev_ids)
  48. }
  49. fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
  50. (**self).save_folder_revisions(repeated_revision)
  51. }
  52. fn reset_object(
  53. &self,
  54. object_id: &str,
  55. repeated_revision: RepeatedRevisionPB,
  56. ) -> BoxResultFuture<(), CollaborateError> {
  57. (**self).reset_folder(object_id, repeated_revision)
  58. }
  59. }
  60. pub struct ServerFolderManager {
  61. folder_handlers: Arc<RwLock<HashMap<String, Arc<OpenFolderHandler>>>>,
  62. persistence: Arc<dyn FolderCloudPersistence>,
  63. }
  64. impl ServerFolderManager {
  65. pub fn new(persistence: Arc<dyn FolderCloudPersistence>) -> Self {
  66. Self {
  67. folder_handlers: Arc::new(RwLock::new(HashMap::new())),
  68. persistence,
  69. }
  70. }
  71. pub async fn handle_client_revisions(
  72. &self,
  73. user: Arc<dyn RevisionUser>,
  74. mut client_data: ClientRevisionWSData,
  75. ) -> Result<(), CollaborateError> {
  76. let repeated_revision = client_data.take_revisions();
  77. let cloned_user = user.clone();
  78. let ack_id = rev_id_from_str(&client_data.data_id)?;
  79. let folder_id = client_data.object_id;
  80. let user_id = user.user_id();
  81. let result = match self.get_folder_handler(&user_id, &folder_id).await {
  82. None => {
  83. let _ = self
  84. .create_folder(&user_id, &folder_id, repeated_revision)
  85. .await
  86. .map_err(|e| CollaborateError::internal().context(format!("Server create folder failed: {}", e)))?;
  87. Ok(())
  88. }
  89. Some(handler) => {
  90. let _ = handler.apply_revisions(user, repeated_revision).await?;
  91. Ok(())
  92. }
  93. };
  94. if result.is_ok() {
  95. cloned_user.receive(RevisionSyncResponse::Ack(
  96. ServerRevisionWSDataBuilder::build_ack_message(&folder_id, ack_id),
  97. ));
  98. }
  99. result
  100. }
  101. pub async fn handle_client_ping(
  102. &self,
  103. user: Arc<dyn RevisionUser>,
  104. client_data: ClientRevisionWSData,
  105. ) -> Result<(), CollaborateError> {
  106. let user_id = user.user_id();
  107. let rev_id = rev_id_from_str(&client_data.data_id)?;
  108. let folder_id = client_data.object_id.clone();
  109. match self.get_folder_handler(&user_id, &folder_id).await {
  110. None => {
  111. tracing::trace!("Folder:{} doesn't exist, ignore client ping", folder_id);
  112. Ok(())
  113. }
  114. Some(handler) => {
  115. let _ = handler.apply_ping(rev_id, user).await?;
  116. Ok(())
  117. }
  118. }
  119. }
  120. async fn get_folder_handler(&self, user_id: &str, folder_id: &str) -> Option<Arc<OpenFolderHandler>> {
  121. let folder_id = folder_id.to_owned();
  122. if let Some(handler) = self.folder_handlers.read().await.get(&folder_id).cloned() {
  123. return Some(handler);
  124. }
  125. let mut write_guard = self.folder_handlers.write().await;
  126. match self.persistence.read_folder(user_id, &folder_id).await {
  127. Ok(folder_info) => {
  128. let handler = self
  129. .create_folder_handler(folder_info)
  130. .await
  131. .map_err(internal_error)
  132. .unwrap();
  133. write_guard.insert(folder_id, handler.clone());
  134. drop(write_guard);
  135. Some(handler)
  136. }
  137. Err(_) => None,
  138. }
  139. }
  140. async fn create_folder_handler(&self, folder_info: FolderInfo) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  141. let persistence = self.persistence.clone();
  142. let handle = spawn_blocking(|| OpenFolderHandler::new(folder_info, persistence))
  143. .await
  144. .map_err(|e| CollaborateError::internal().context(format!("Create folder handler failed: {}", e)))?;
  145. Ok(Arc::new(handle?))
  146. }
  147. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  148. async fn create_folder(
  149. &self,
  150. user_id: &str,
  151. folder_id: &str,
  152. repeated_revision: RepeatedRevisionPB,
  153. ) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
  154. match self
  155. .persistence
  156. .create_folder(user_id, folder_id, repeated_revision)
  157. .await?
  158. {
  159. Some(folder_info) => {
  160. let handler = self.create_folder_handler(folder_info).await?;
  161. self.folder_handlers
  162. .write()
  163. .await
  164. .insert(folder_id.to_owned(), handler.clone());
  165. Ok(handler)
  166. }
  167. None => Err(CollaborateError::internal().context(String::new())),
  168. }
  169. }
  170. }
  171. type FolderRevisionSynchronizer = RevisionSynchronizer<PlainAttributes>;
  172. struct OpenFolderHandler {
  173. folder_id: String,
  174. sender: mpsc::Sender<FolderCommand>,
  175. }
  176. impl OpenFolderHandler {
  177. fn new(folder_info: FolderInfo, persistence: Arc<dyn FolderCloudPersistence>) -> CollaborateResult<Self> {
  178. let (sender, receiver) = mpsc::channel(1000);
  179. let folder_id = folder_info.folder_id.clone();
  180. let delta = FolderDelta::from_bytes(&folder_info.text)?;
  181. let sync_object = ServerFolder::from_delta(&folder_id, delta);
  182. let synchronizer = Arc::new(FolderRevisionSynchronizer::new(
  183. folder_info.rev_id,
  184. sync_object,
  185. persistence,
  186. ));
  187. let queue = FolderCommandRunner::new(&folder_id, receiver, synchronizer);
  188. tokio::task::spawn(queue.run());
  189. Ok(Self { folder_id, sender })
  190. }
  191. #[tracing::instrument(
  192. name = "server_folder_apply_revision",
  193. level = "trace",
  194. skip(self, user, repeated_revision),
  195. err
  196. )]
  197. async fn apply_revisions(
  198. &self,
  199. user: Arc<dyn RevisionUser>,
  200. repeated_revision: RepeatedRevisionPB,
  201. ) -> CollaborateResult<()> {
  202. let (ret, rx) = oneshot::channel();
  203. let msg = FolderCommand::ApplyRevisions {
  204. user,
  205. repeated_revision,
  206. ret,
  207. };
  208. self.send(msg, rx).await?
  209. }
  210. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
  211. let (ret, rx) = oneshot::channel();
  212. let msg = FolderCommand::Ping { user, rev_id, ret };
  213. self.send(msg, rx).await?
  214. }
  215. async fn send<T>(&self, msg: FolderCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
  216. let _ = self
  217. .sender
  218. .send(msg)
  219. .await
  220. .map_err(|e| CollaborateError::internal().context(format!("Send folder command failed: {}", e)))?;
  221. Ok(rx.await.map_err(internal_error)?)
  222. }
  223. }
  224. impl std::ops::Drop for OpenFolderHandler {
  225. fn drop(&mut self) {
  226. tracing::trace!("{} OpenFolderHandler was dropped", self.folder_id);
  227. }
  228. }
  229. enum FolderCommand {
  230. ApplyRevisions {
  231. user: Arc<dyn RevisionUser>,
  232. repeated_revision: RepeatedRevisionPB,
  233. ret: oneshot::Sender<CollaborateResult<()>>,
  234. },
  235. Ping {
  236. user: Arc<dyn RevisionUser>,
  237. rev_id: i64,
  238. ret: oneshot::Sender<CollaborateResult<()>>,
  239. },
  240. }
  241. struct FolderCommandRunner {
  242. folder_id: String,
  243. receiver: Option<mpsc::Receiver<FolderCommand>>,
  244. synchronizer: Arc<FolderRevisionSynchronizer>,
  245. }
  246. impl FolderCommandRunner {
  247. fn new(
  248. folder_id: &str,
  249. receiver: mpsc::Receiver<FolderCommand>,
  250. synchronizer: Arc<FolderRevisionSynchronizer>,
  251. ) -> Self {
  252. Self {
  253. folder_id: folder_id.to_owned(),
  254. receiver: Some(receiver),
  255. synchronizer,
  256. }
  257. }
  258. async fn run(mut self) {
  259. let mut receiver = self
  260. .receiver
  261. .take()
  262. .expect("FolderCommandRunner's receiver should only take one time");
  263. let stream = stream! {
  264. loop {
  265. match receiver.recv().await {
  266. Some(msg) => yield msg,
  267. None => break,
  268. }
  269. }
  270. };
  271. stream.for_each(|msg| self.handle_message(msg)).await;
  272. }
  273. async fn handle_message(&self, msg: FolderCommand) {
  274. match msg {
  275. FolderCommand::ApplyRevisions {
  276. user,
  277. repeated_revision,
  278. ret,
  279. } => {
  280. let result = self
  281. .synchronizer
  282. .sync_revisions(user, repeated_revision)
  283. .await
  284. .map_err(internal_error);
  285. let _ = ret.send(result);
  286. }
  287. FolderCommand::Ping { user, rev_id, ret } => {
  288. let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
  289. let _ = ret.send(result);
  290. }
  291. }
  292. }
  293. }
  294. impl std::ops::Drop for FolderCommandRunner {
  295. fn drop(&mut self) {
  296. tracing::trace!("{} FolderCommandRunner was dropped", self.folder_id);
  297. }
  298. }