document_manager.rs 12 KB


  1. use crate::{
  2. entities::{document_info::DocumentInfo, ws_data::ServerRevisionWSDataBuilder},
  3. errors::{internal_error, CollaborateError, CollaborateResult},
  4. protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
  5. server_document::document_pad::ServerDocument,
  6. synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
  7. util::rev_id_from_str,
  8. };
  9. use async_stream::stream;
  10. use dashmap::DashMap;
  11. use futures::stream::StreamExt;
  12. use lib_infra::future::BoxResultFuture;
  13. use lib_ot::rich_text::{RichTextAttributes, RichTextDelta};
  14. use std::{collections::HashMap, fmt::Debug, sync::Arc};
  15. use tokio::{
  16. sync::{mpsc, oneshot, RwLock},
  17. task::spawn_blocking,
  18. };
  19. pub trait DocumentCloudPersistence: Send + Sync + Debug {
  20. fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
  21. fn create_document(
  22. &self,
  23. doc_id: &str,
  24. repeated_revision: RepeatedRevisionPB,
  25. ) -> BoxResultFuture<Option<DocumentInfo>, CollaborateError>;
  26. fn read_document_revisions(
  27. &self,
  28. doc_id: &str,
  29. rev_ids: Option<Vec<i64>>,
  30. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
  31. fn save_document_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
  32. fn reset_document(
  33. &self,
  34. doc_id: &str,
  35. repeated_revision: RepeatedRevisionPB,
  36. ) -> BoxResultFuture<(), CollaborateError>;
  37. }
  38. impl RevisionSyncPersistence for Arc<dyn DocumentCloudPersistence> {
  39. fn read_revisions(
  40. &self,
  41. object_id: &str,
  42. rev_ids: Option<Vec<i64>>,
  43. ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  44. (**self).read_document_revisions(object_id, rev_ids)
  45. }
  46. fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
  47. (**self).save_document_revisions(repeated_revision)
  48. }
  49. fn reset_object(
  50. &self,
  51. object_id: &str,
  52. repeated_revision: RepeatedRevisionPB,
  53. ) -> BoxResultFuture<(), CollaborateError> {
  54. (**self).reset_document(object_id, repeated_revision)
  55. }
  56. }
  57. pub struct ServerDocumentManager {
  58. document_handlers: Arc<RwLock<HashMap<String, Arc<OpenDocumentHandler>>>>,
  59. persistence: Arc<dyn DocumentCloudPersistence>,
  60. }
  61. impl ServerDocumentManager {
  62. pub fn new(persistence: Arc<dyn DocumentCloudPersistence>) -> Self {
  63. Self {
  64. document_handlers: Arc::new(RwLock::new(HashMap::new())),
  65. persistence,
  66. }
  67. }
  68. pub async fn handle_client_revisions(
  69. &self,
  70. user: Arc<dyn RevisionUser>,
  71. mut client_data: ClientRevisionWSData,
  72. ) -> Result<(), CollaborateError> {
  73. let repeated_revision = client_data.take_revisions();
  74. let cloned_user = user.clone();
  75. let ack_id = rev_id_from_str(&client_data.data_id)?;
  76. let object_id = client_data.object_id;
  77. let result = match self.get_document_handler(&object_id).await {
  78. None => {
  79. tracing::trace!("Can't find the document. Creating the document {}", object_id);
  80. let _ = self.create_document(&object_id, repeated_revision).await.map_err(|e| {
  81. CollaborateError::internal().context(format!("Server create document failed: {}", e))
  82. })?;
  83. Ok(())
  84. }
  85. Some(handler) => {
  86. let _ = handler.apply_revisions(user, repeated_revision).await?;
  87. Ok(())
  88. }
  89. };
  90. if result.is_ok() {
  91. cloned_user.receive(RevisionSyncResponse::Ack(
  92. ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id),
  93. ));
  94. }
  95. result
  96. }
  97. pub async fn handle_client_ping(
  98. &self,
  99. user: Arc<dyn RevisionUser>,
  100. client_data: ClientRevisionWSData,
  101. ) -> Result<(), CollaborateError> {
  102. let rev_id = rev_id_from_str(&client_data.data_id)?;
  103. let doc_id = client_data.object_id.clone();
  104. match self.get_document_handler(&doc_id).await {
  105. None => {
  106. tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id);
  107. Ok(())
  108. }
  109. Some(handler) => {
  110. let _ = handler.apply_ping(rev_id, user).await?;
  111. Ok(())
  112. }
  113. }
  114. }
  115. pub async fn handle_document_reset(
  116. &self,
  117. doc_id: &str,
  118. mut repeated_revision: RepeatedRevisionPB,
  119. ) -> Result<(), CollaborateError> {
  120. repeated_revision.mut_items().sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
  121. match self.get_document_handler(doc_id).await {
  122. None => {
  123. tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
  124. Ok(())
  125. }
  126. Some(handler) => {
  127. let _ = handler.apply_document_reset(repeated_revision).await?;
  128. Ok(())
  129. }
  130. }
  131. }
  132. async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocumentHandler>> {
  133. if let Some(handler) = self.document_handlers.read().await.get(doc_id).cloned() {
  134. return Some(handler);
  135. }
  136. let mut write_guard = self.document_handlers.write().await;
  137. match self.persistence.read_document(doc_id).await {
  138. Ok(doc) => {
  139. let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
  140. write_guard.insert(doc_id.to_owned(), handler.clone());
  141. drop(write_guard);
  142. Some(handler)
  143. }
  144. Err(_) => None,
  145. }
  146. }
  147. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  148. async fn create_document(
  149. &self,
  150. doc_id: &str,
  151. repeated_revision: RepeatedRevisionPB,
  152. ) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
  153. match self.persistence.create_document(doc_id, repeated_revision).await? {
  154. None => Err(CollaborateError::internal().context("Create document info from revisions failed")),
  155. Some(doc) => {
  156. let handler = self.create_document_handler(doc).await?;
  157. self.document_handlers
  158. .write()
  159. .await
  160. .insert(doc_id.to_owned(), handler.clone());
  161. Ok(handler)
  162. }
  163. }
  164. }
  165. async fn create_document_handler(&self, doc: DocumentInfo) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
  166. let persistence = self.persistence.clone();
  167. let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence))
  168. .await
  169. .map_err(|e| CollaborateError::internal().context(format!("Create document handler failed: {}", e)))?;
  170. Ok(Arc::new(handle?))
  171. }
  172. }
  173. impl std::ops::Drop for ServerDocumentManager {
  174. fn drop(&mut self) {
  175. log::trace!("ServerDocumentManager was dropped");
  176. }
  177. }
  178. type DocumentRevisionSynchronizer = RevisionSynchronizer<RichTextAttributes>;
  179. struct OpenDocumentHandler {
  180. doc_id: String,
  181. sender: mpsc::Sender<DocumentCommand>,
  182. users: DashMap<String, Arc<dyn RevisionUser>>,
  183. }
  184. impl OpenDocumentHandler {
  185. fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentCloudPersistence>) -> Result<Self, CollaborateError> {
  186. let doc_id = doc.doc_id.clone();
  187. let (sender, receiver) = mpsc::channel(1000);
  188. let users = DashMap::new();
  189. let delta = RichTextDelta::from_bytes(&doc.text)?;
  190. let sync_object = ServerDocument::from_delta(&doc_id, delta);
  191. let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(doc.rev_id, sync_object, persistence));
  192. let queue = DocumentCommandRunner::new(&doc.doc_id, receiver, synchronizer);
  193. tokio::task::spawn(queue.run());
  194. Ok(Self { doc_id, sender, users })
  195. }
  196. #[tracing::instrument(
  197. name = "server_document_apply_revision",
  198. level = "trace",
  199. skip(self, user, repeated_revision),
  200. err
  201. )]
  202. async fn apply_revisions(
  203. &self,
  204. user: Arc<dyn RevisionUser>,
  205. repeated_revision: RepeatedRevisionPB,
  206. ) -> Result<(), CollaborateError> {
  207. let (ret, rx) = oneshot::channel();
  208. self.users.insert(user.user_id(), user.clone());
  209. let msg = DocumentCommand::ApplyRevisions {
  210. user,
  211. repeated_revision,
  212. ret,
  213. };
  214. let result = self.send(msg, rx).await?;
  215. result
  216. }
  217. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
  218. let (ret, rx) = oneshot::channel();
  219. self.users.insert(user.user_id(), user.clone());
  220. let msg = DocumentCommand::Ping { user, rev_id, ret };
  221. let result = self.send(msg, rx).await?;
  222. result
  223. }
  224. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  225. async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> {
  226. let (ret, rx) = oneshot::channel();
  227. let msg = DocumentCommand::Reset { repeated_revision, ret };
  228. let result = self.send(msg, rx).await?;
  229. result
  230. }
  231. async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
  232. let _ = self
  233. .sender
  234. .send(msg)
  235. .await
  236. .map_err(|e| CollaborateError::internal().context(format!("Send document command failed: {}", e)))?;
  237. Ok(rx.await.map_err(internal_error)?)
  238. }
  239. }
  240. impl std::ops::Drop for OpenDocumentHandler {
  241. fn drop(&mut self) {
  242. tracing::trace!("{} OpenDocHandle was dropped", self.doc_id);
  243. }
  244. }
  245. // #[derive(Debug)]
  246. enum DocumentCommand {
  247. ApplyRevisions {
  248. user: Arc<dyn RevisionUser>,
  249. repeated_revision: RepeatedRevisionPB,
  250. ret: oneshot::Sender<CollaborateResult<()>>,
  251. },
  252. Ping {
  253. user: Arc<dyn RevisionUser>,
  254. rev_id: i64,
  255. ret: oneshot::Sender<CollaborateResult<()>>,
  256. },
  257. Reset {
  258. repeated_revision: RepeatedRevisionPB,
  259. ret: oneshot::Sender<CollaborateResult<()>>,
  260. },
  261. }
  262. struct DocumentCommandRunner {
  263. pub doc_id: String,
  264. receiver: Option<mpsc::Receiver<DocumentCommand>>,
  265. synchronizer: Arc<DocumentRevisionSynchronizer>,
  266. }
  267. impl DocumentCommandRunner {
  268. fn new(
  269. doc_id: &str,
  270. receiver: mpsc::Receiver<DocumentCommand>,
  271. synchronizer: Arc<DocumentRevisionSynchronizer>,
  272. ) -> Self {
  273. Self {
  274. doc_id: doc_id.to_owned(),
  275. receiver: Some(receiver),
  276. synchronizer,
  277. }
  278. }
  279. async fn run(mut self) {
  280. let mut receiver = self
  281. .receiver
  282. .take()
  283. .expect("DocumentCommandRunner's receiver should only take one time");
  284. let stream = stream! {
  285. loop {
  286. match receiver.recv().await {
  287. Some(msg) => yield msg,
  288. None => break,
  289. }
  290. }
  291. };
  292. stream.for_each(|msg| self.handle_message(msg)).await;
  293. }
  294. async fn handle_message(&self, msg: DocumentCommand) {
  295. match msg {
  296. DocumentCommand::ApplyRevisions {
  297. user,
  298. repeated_revision,
  299. ret,
  300. } => {
  301. let result = self
  302. .synchronizer
  303. .sync_revisions(user, repeated_revision)
  304. .await
  305. .map_err(internal_error);
  306. let _ = ret.send(result);
  307. }
  308. DocumentCommand::Ping { user, rev_id, ret } => {
  309. let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
  310. let _ = ret.send(result);
  311. }
  312. DocumentCommand::Reset { repeated_revision, ret } => {
  313. let result = self.synchronizer.reset(repeated_revision).await.map_err(internal_error);
  314. let _ = ret.send(result);
  315. }
  316. }
  317. }
  318. }
  319. impl std::ops::Drop for DocumentCommandRunner {
  320. fn drop(&mut self) {
  321. tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id);
  322. }
  323. }