document_manager.rs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. use crate::server_document::document_pad::ServerDocument;
  2. use async_stream::stream;
  3. use dashmap::DashMap;
  4. use document_model::document::DocumentInfo;
  5. use flowy_sync::errors::{internal_sync_error, SyncError, SyncResult};
  6. use flowy_sync::ext::DocumentCloudPersistence;
  7. use flowy_sync::{RevisionSyncResponse, RevisionSynchronizer, RevisionUser};
  8. use futures::stream::StreamExt;
  9. use lib_ot::core::AttributeHashMap;
  10. use lib_ot::text_delta::DeltaTextOperations;
  11. use revision_model::Revision;
  12. use std::{collections::HashMap, sync::Arc};
  13. use tokio::{
  14. sync::{mpsc, oneshot, RwLock},
  15. task::spawn_blocking,
  16. };
  17. use ws_model::ws_revision::{ClientRevisionWSData, ServerRevisionWSDataBuilder};
  18. pub struct ServerDocumentManager {
  19. document_handlers: Arc<RwLock<HashMap<String, Arc<OpenDocumentHandler>>>>,
  20. persistence: Arc<dyn DocumentCloudPersistence>,
  21. }
  22. impl ServerDocumentManager {
  23. pub fn new(persistence: Arc<dyn DocumentCloudPersistence>) -> Self {
  24. Self {
  25. document_handlers: Arc::new(RwLock::new(HashMap::new())),
  26. persistence,
  27. }
  28. }
  29. pub async fn handle_client_revisions(
  30. &self,
  31. user: Arc<dyn RevisionUser>,
  32. client_data: ClientRevisionWSData,
  33. ) -> Result<(), SyncError> {
  34. let cloned_user = user.clone();
  35. let ack_id = client_data.rev_id;
  36. let object_id = client_data.object_id;
  37. let result = match self.get_document_handler(&object_id).await {
  38. None => {
  39. tracing::trace!(
  40. "Can't find the document. Creating the document {}",
  41. object_id
  42. );
  43. let _ = self
  44. .create_document(&object_id, client_data.revisions)
  45. .await
  46. .map_err(|e| {
  47. SyncError::internal().context(format!("Server create document failed: {}", e))
  48. })?;
  49. Ok(())
  50. },
  51. Some(handler) => {
  52. handler.apply_revisions(user, client_data.revisions).await?;
  53. Ok(())
  54. },
  55. };
  56. if result.is_ok() {
  57. cloned_user.receive(RevisionSyncResponse::Ack(
  58. ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id),
  59. ));
  60. }
  61. result
  62. }
  63. pub async fn handle_client_ping(
  64. &self,
  65. user: Arc<dyn RevisionUser>,
  66. client_data: ClientRevisionWSData,
  67. ) -> Result<(), SyncError> {
  68. let rev_id = client_data.rev_id;
  69. let doc_id = client_data.object_id.clone();
  70. match self.get_document_handler(&doc_id).await {
  71. None => {
  72. tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id);
  73. Ok(())
  74. },
  75. Some(handler) => {
  76. handler.apply_ping(rev_id, user).await?;
  77. Ok(())
  78. },
  79. }
  80. }
  81. pub async fn handle_document_reset(
  82. &self,
  83. doc_id: &str,
  84. mut revisions: Vec<Revision>,
  85. ) -> Result<(), SyncError> {
  86. revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
  87. match self.get_document_handler(doc_id).await {
  88. None => {
  89. tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
  90. Ok(())
  91. },
  92. Some(handler) => {
  93. handler.apply_document_reset(revisions).await?;
  94. Ok(())
  95. },
  96. }
  97. }
  98. async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocumentHandler>> {
  99. if let Some(handler) = self.document_handlers.read().await.get(doc_id).cloned() {
  100. return Some(handler);
  101. }
  102. let mut write_guard = self.document_handlers.write().await;
  103. match self.persistence.read_document(doc_id).await {
  104. Ok(doc) => {
  105. let handler = self.create_document_handler(doc).await.unwrap();
  106. write_guard.insert(doc_id.to_owned(), handler.clone());
  107. drop(write_guard);
  108. Some(handler)
  109. },
  110. Err(_) => None,
  111. }
  112. }
  113. async fn create_document(
  114. &self,
  115. doc_id: &str,
  116. revisions: Vec<Revision>,
  117. ) -> Result<Arc<OpenDocumentHandler>, SyncError> {
  118. match self.persistence.create_document(doc_id, revisions).await? {
  119. None => Err(SyncError::internal().context("Create document info from revisions failed")),
  120. Some(doc) => {
  121. let handler = self.create_document_handler(doc).await?;
  122. self
  123. .document_handlers
  124. .write()
  125. .await
  126. .insert(doc_id.to_owned(), handler.clone());
  127. Ok(handler)
  128. },
  129. }
  130. }
  131. #[tracing::instrument(level = "debug", skip(self, doc), err)]
  132. async fn create_document_handler(
  133. &self,
  134. doc: DocumentInfo,
  135. ) -> Result<Arc<OpenDocumentHandler>, SyncError> {
  136. let persistence = self.persistence.clone();
  137. let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence))
  138. .await
  139. .map_err(|e| {
  140. SyncError::internal().context(format!("Create document handler failed: {}", e))
  141. })?;
  142. Ok(Arc::new(handle?))
  143. }
  144. }
  145. impl std::ops::Drop for ServerDocumentManager {
  146. fn drop(&mut self) {
  147. log::trace!("ServerDocumentManager was dropped");
  148. }
  149. }
  150. type DocumentRevisionSynchronizer = RevisionSynchronizer<AttributeHashMap>;
  151. struct OpenDocumentHandler {
  152. doc_id: String,
  153. sender: mpsc::Sender<DocumentCommand>,
  154. users: DashMap<String, Arc<dyn RevisionUser>>,
  155. }
  156. impl OpenDocumentHandler {
  157. fn new(
  158. doc: DocumentInfo,
  159. persistence: Arc<dyn DocumentCloudPersistence>,
  160. ) -> Result<Self, SyncError> {
  161. let doc_id = doc.doc_id.clone();
  162. let (sender, receiver) = mpsc::channel(1000);
  163. let users = DashMap::new();
  164. let operations = DeltaTextOperations::from_bytes(&doc.data)?;
  165. let sync_object = ServerDocument::from_operations(&doc_id, operations);
  166. let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(
  167. doc.rev_id,
  168. sync_object,
  169. persistence,
  170. ));
  171. let queue = DocumentCommandRunner::new(&doc.doc_id, receiver, synchronizer);
  172. tokio::task::spawn(queue.run());
  173. Ok(Self {
  174. doc_id,
  175. sender,
  176. users,
  177. })
  178. }
  179. #[tracing::instrument(
  180. name = "server_document_apply_revision",
  181. level = "trace",
  182. skip(self, user, revisions),
  183. err
  184. )]
  185. async fn apply_revisions(
  186. &self,
  187. user: Arc<dyn RevisionUser>,
  188. revisions: Vec<Revision>,
  189. ) -> Result<(), SyncError> {
  190. let (ret, rx) = oneshot::channel();
  191. self.users.insert(user.user_id(), user.clone());
  192. let msg = DocumentCommand::ApplyRevisions {
  193. user,
  194. revisions,
  195. ret,
  196. };
  197. self.send(msg, rx).await?
  198. }
  199. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), SyncError> {
  200. let (ret, rx) = oneshot::channel();
  201. self.users.insert(user.user_id(), user.clone());
  202. let msg = DocumentCommand::Ping { user, rev_id, ret };
  203. self.send(msg, rx).await?
  204. }
  205. #[tracing::instrument(level = "debug", skip(self, revisions), err)]
  206. async fn apply_document_reset(&self, revisions: Vec<Revision>) -> Result<(), SyncError> {
  207. let (ret, rx) = oneshot::channel();
  208. let msg = DocumentCommand::Reset { revisions, ret };
  209. self.send(msg, rx).await?
  210. }
  211. async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> SyncResult<T> {
  212. self
  213. .sender
  214. .send(msg)
  215. .await
  216. .map_err(|e| SyncError::internal().context(format!("Send document command failed: {}", e)))?;
  217. rx.await.map_err(internal_sync_error)
  218. }
  219. }
  220. impl std::ops::Drop for OpenDocumentHandler {
  221. fn drop(&mut self) {
  222. tracing::trace!("{} OpenDocHandle was dropped", self.doc_id);
  223. }
  224. }
  225. // #[derive(Debug)]
  226. enum DocumentCommand {
  227. ApplyRevisions {
  228. user: Arc<dyn RevisionUser>,
  229. revisions: Vec<Revision>,
  230. ret: oneshot::Sender<SyncResult<()>>,
  231. },
  232. Ping {
  233. user: Arc<dyn RevisionUser>,
  234. rev_id: i64,
  235. ret: oneshot::Sender<SyncResult<()>>,
  236. },
  237. Reset {
  238. revisions: Vec<Revision>,
  239. ret: oneshot::Sender<SyncResult<()>>,
  240. },
  241. }
  242. struct DocumentCommandRunner {
  243. pub doc_id: String,
  244. receiver: Option<mpsc::Receiver<DocumentCommand>>,
  245. synchronizer: Arc<DocumentRevisionSynchronizer>,
  246. }
  247. impl DocumentCommandRunner {
  248. fn new(
  249. doc_id: &str,
  250. receiver: mpsc::Receiver<DocumentCommand>,
  251. synchronizer: Arc<DocumentRevisionSynchronizer>,
  252. ) -> Self {
  253. Self {
  254. doc_id: doc_id.to_owned(),
  255. receiver: Some(receiver),
  256. synchronizer,
  257. }
  258. }
  259. async fn run(mut self) {
  260. let mut receiver = self
  261. .receiver
  262. .take()
  263. .expect("DocumentCommandRunner's receiver should only take one time");
  264. let stream = stream! {
  265. while let Some(msg) = receiver.recv().await {
  266. yield msg;
  267. }
  268. };
  269. stream.for_each(|msg| self.handle_message(msg)).await;
  270. }
  271. async fn handle_message(&self, msg: DocumentCommand) {
  272. match msg {
  273. DocumentCommand::ApplyRevisions {
  274. user,
  275. revisions,
  276. ret,
  277. } => {
  278. let result = self
  279. .synchronizer
  280. .sync_revisions(user, revisions)
  281. .await
  282. .map_err(internal_sync_error);
  283. let _ = ret.send(result);
  284. },
  285. DocumentCommand::Ping { user, rev_id, ret } => {
  286. let result = self
  287. .synchronizer
  288. .pong(user, rev_id)
  289. .await
  290. .map_err(internal_sync_error);
  291. let _ = ret.send(result);
  292. },
  293. DocumentCommand::Reset { revisions, ret } => {
  294. let result = self
  295. .synchronizer
  296. .reset(revisions)
  297. .await
  298. .map_err(internal_sync_error);
  299. let _ = ret.send(result);
  300. },
  301. }
  302. }
  303. }
  304. impl std::ops::Drop for DocumentCommandRunner {
  305. fn drop(&mut self) {
  306. tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id);
  307. }
  308. }