document_manager.rs 12 KB


  1. use crate::entities::revision::{RepeatedRevision, Revision};
  2. use crate::{
  3. entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder},
  4. errors::{internal_error, CollaborateError, CollaborateResult},
  5. protobuf::ClientRevisionWSData,
  6. server_document::document_pad::ServerDocument,
  7. synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
  8. util::rev_id_from_str,
  9. };
  10. use async_stream::stream;
  11. use dashmap::DashMap;
  12. use futures::stream::StreamExt;
  13. use lib_infra::future::BoxResultFuture;
  14. use lib_ot::rich_text::{RichTextAttributes, RichTextDelta};
  15. use std::{collections::HashMap, fmt::Debug, sync::Arc};
  16. use tokio::{
  17. sync::{mpsc, oneshot, RwLock},
  18. task::spawn_blocking,
  19. };
  20. pub trait TextBlockCloudPersistence: Send + Sync + Debug {
  21. fn read_text_block(&self, doc_id: &str) -> BoxResultFuture<DocumentPB, CollaborateError>;
  22. fn create_text_block(
  23. &self,
  24. doc_id: &str,
  25. repeated_revision: RepeatedRevision,
  26. ) -> BoxResultFuture<Option<DocumentPB>, CollaborateError>;
  27. fn read_text_block_revisions(
  28. &self,
  29. doc_id: &str,
  30. rev_ids: Option<Vec<i64>>,
  31. ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
  32. fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
  33. fn reset_text_block(
  34. &self,
  35. doc_id: &str,
  36. repeated_revision: RepeatedRevision,
  37. ) -> BoxResultFuture<(), CollaborateError>;
  38. }
  39. impl RevisionSyncPersistence for Arc<dyn TextBlockCloudPersistence> {
  40. fn read_revisions(
  41. &self,
  42. object_id: &str,
  43. rev_ids: Option<Vec<i64>>,
  44. ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  45. (**self).read_text_block_revisions(object_id, rev_ids)
  46. }
  47. fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
  48. (**self).save_text_block_revisions(repeated_revision)
  49. }
  50. fn reset_object(
  51. &self,
  52. object_id: &str,
  53. repeated_revision: RepeatedRevision,
  54. ) -> BoxResultFuture<(), CollaborateError> {
  55. (**self).reset_text_block(object_id, repeated_revision)
  56. }
  57. }
  58. pub struct ServerDocumentManager {
  59. document_handlers: Arc<RwLock<HashMap<String, Arc<OpenDocumentHandler>>>>,
  60. persistence: Arc<dyn TextBlockCloudPersistence>,
  61. }
  62. impl ServerDocumentManager {
  63. pub fn new(persistence: Arc<dyn TextBlockCloudPersistence>) -> Self {
  64. Self {
  65. document_handlers: Arc::new(RwLock::new(HashMap::new())),
  66. persistence,
  67. }
  68. }
  69. pub async fn handle_client_revisions(
  70. &self,
  71. user: Arc<dyn RevisionUser>,
  72. mut client_data: ClientRevisionWSData,
  73. ) -> Result<(), CollaborateError> {
  74. let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
  75. let cloned_user = user.clone();
  76. let ack_id = rev_id_from_str(&client_data.data_id)?;
  77. let object_id = client_data.object_id;
  78. let result = match self.get_document_handler(&object_id).await {
  79. None => {
  80. tracing::trace!("Can't find the document. Creating the document {}", object_id);
  81. let _ = self.create_document(&object_id, repeated_revision).await.map_err(|e| {
  82. CollaborateError::internal().context(format!("Server create document failed: {}", e))
  83. })?;
  84. Ok(())
  85. }
  86. Some(handler) => {
  87. let _ = handler.apply_revisions(user, repeated_revision).await?;
  88. Ok(())
  89. }
  90. };
  91. if result.is_ok() {
  92. cloned_user.receive(RevisionSyncResponse::Ack(
  93. ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id),
  94. ));
  95. }
  96. result
  97. }
  98. pub async fn handle_client_ping(
  99. &self,
  100. user: Arc<dyn RevisionUser>,
  101. client_data: ClientRevisionWSData,
  102. ) -> Result<(), CollaborateError> {
  103. let rev_id = rev_id_from_str(&client_data.data_id)?;
  104. let doc_id = client_data.object_id.clone();
  105. match self.get_document_handler(&doc_id).await {
  106. None => {
  107. tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id);
  108. Ok(())
  109. }
  110. Some(handler) => {
  111. let _ = handler.apply_ping(rev_id, user).await?;
  112. Ok(())
  113. }
  114. }
  115. }
  116. pub async fn handle_document_reset(
  117. &self,
  118. doc_id: &str,
  119. mut repeated_revision: RepeatedRevision,
  120. ) -> Result<(), CollaborateError> {
  121. repeated_revision.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
  122. match self.get_document_handler(doc_id).await {
  123. None => {
  124. tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
  125. Ok(())
  126. }
  127. Some(handler) => {
  128. let _ = handler.apply_document_reset(repeated_revision).await?;
  129. Ok(())
  130. }
  131. }
  132. }
  133. async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocumentHandler>> {
  134. if let Some(handler) = self.document_handlers.read().await.get(doc_id).cloned() {
  135. return Some(handler);
  136. }
  137. let mut write_guard = self.document_handlers.write().await;
  138. match self.persistence.read_text_block(doc_id).await {
  139. Ok(doc) => {
  140. let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
  141. write_guard.insert(doc_id.to_owned(), handler.clone());
  142. drop(write_guard);
  143. Some(handler)
  144. }
  145. Err(_) => None,
  146. }
  147. }
  148. async fn create_document(
  149. &self,
  150. doc_id: &str,
  151. repeated_revision: RepeatedRevision,
  152. ) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
  153. match self.persistence.create_text_block(doc_id, repeated_revision).await? {
  154. None => Err(CollaborateError::internal().context("Create document info from revisions failed")),
  155. Some(doc) => {
  156. let handler = self.create_document_handler(doc).await?;
  157. self.document_handlers
  158. .write()
  159. .await
  160. .insert(doc_id.to_owned(), handler.clone());
  161. Ok(handler)
  162. }
  163. }
  164. }
  165. #[tracing::instrument(level = "debug", skip(self, doc), err)]
  166. async fn create_document_handler(&self, doc: DocumentPB) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
  167. let persistence = self.persistence.clone();
  168. let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence))
  169. .await
  170. .map_err(|e| CollaborateError::internal().context(format!("Create document handler failed: {}", e)))?;
  171. Ok(Arc::new(handle?))
  172. }
  173. }
  174. impl std::ops::Drop for ServerDocumentManager {
  175. fn drop(&mut self) {
  176. log::trace!("ServerDocumentManager was dropped");
  177. }
  178. }
  179. type DocumentRevisionSynchronizer = RevisionSynchronizer<RichTextAttributes>;
  180. struct OpenDocumentHandler {
  181. doc_id: String,
  182. sender: mpsc::Sender<DocumentCommand>,
  183. users: DashMap<String, Arc<dyn RevisionUser>>,
  184. }
  185. impl OpenDocumentHandler {
  186. fn new(doc: DocumentPB, persistence: Arc<dyn TextBlockCloudPersistence>) -> Result<Self, CollaborateError> {
  187. let doc_id = doc.block_id.clone();
  188. let (sender, receiver) = mpsc::channel(1000);
  189. let users = DashMap::new();
  190. let delta = RichTextDelta::from_bytes(&doc.text)?;
  191. let sync_object = ServerDocument::from_delta(&doc_id, delta);
  192. let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(doc.rev_id, sync_object, persistence));
  193. let queue = DocumentCommandRunner::new(&doc.block_id, receiver, synchronizer);
  194. tokio::task::spawn(queue.run());
  195. Ok(Self { doc_id, sender, users })
  196. }
  197. #[tracing::instrument(
  198. name = "server_document_apply_revision",
  199. level = "trace",
  200. skip(self, user, repeated_revision),
  201. err
  202. )]
  203. async fn apply_revisions(
  204. &self,
  205. user: Arc<dyn RevisionUser>,
  206. repeated_revision: RepeatedRevision,
  207. ) -> Result<(), CollaborateError> {
  208. let (ret, rx) = oneshot::channel();
  209. self.users.insert(user.user_id(), user.clone());
  210. let msg = DocumentCommand::ApplyRevisions {
  211. user,
  212. repeated_revision,
  213. ret,
  214. };
  215. let result = self.send(msg, rx).await?;
  216. result
  217. }
  218. async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
  219. let (ret, rx) = oneshot::channel();
  220. self.users.insert(user.user_id(), user.clone());
  221. let msg = DocumentCommand::Ping { user, rev_id, ret };
  222. let result = self.send(msg, rx).await?;
  223. result
  224. }
  225. #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
  226. async fn apply_document_reset(&self, repeated_revision: RepeatedRevision) -> Result<(), CollaborateError> {
  227. let (ret, rx) = oneshot::channel();
  228. let msg = DocumentCommand::Reset { repeated_revision, ret };
  229. let result = self.send(msg, rx).await?;
  230. result
  231. }
  232. async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
  233. let _ = self
  234. .sender
  235. .send(msg)
  236. .await
  237. .map_err(|e| CollaborateError::internal().context(format!("Send document command failed: {}", e)))?;
  238. Ok(rx.await.map_err(internal_error)?)
  239. }
  240. }
  241. impl std::ops::Drop for OpenDocumentHandler {
  242. fn drop(&mut self) {
  243. tracing::trace!("{} OpenDocHandle was dropped", self.doc_id);
  244. }
  245. }
  246. // #[derive(Debug)]
  247. enum DocumentCommand {
  248. ApplyRevisions {
  249. user: Arc<dyn RevisionUser>,
  250. repeated_revision: RepeatedRevision,
  251. ret: oneshot::Sender<CollaborateResult<()>>,
  252. },
  253. Ping {
  254. user: Arc<dyn RevisionUser>,
  255. rev_id: i64,
  256. ret: oneshot::Sender<CollaborateResult<()>>,
  257. },
  258. Reset {
  259. repeated_revision: RepeatedRevision,
  260. ret: oneshot::Sender<CollaborateResult<()>>,
  261. },
  262. }
  263. struct DocumentCommandRunner {
  264. pub doc_id: String,
  265. receiver: Option<mpsc::Receiver<DocumentCommand>>,
  266. synchronizer: Arc<DocumentRevisionSynchronizer>,
  267. }
  268. impl DocumentCommandRunner {
  269. fn new(
  270. doc_id: &str,
  271. receiver: mpsc::Receiver<DocumentCommand>,
  272. synchronizer: Arc<DocumentRevisionSynchronizer>,
  273. ) -> Self {
  274. Self {
  275. doc_id: doc_id.to_owned(),
  276. receiver: Some(receiver),
  277. synchronizer,
  278. }
  279. }
  280. async fn run(mut self) {
  281. let mut receiver = self
  282. .receiver
  283. .take()
  284. .expect("DocumentCommandRunner's receiver should only take one time");
  285. let stream = stream! {
  286. loop {
  287. match receiver.recv().await {
  288. Some(msg) => yield msg,
  289. None => break,
  290. }
  291. }
  292. };
  293. stream.for_each(|msg| self.handle_message(msg)).await;
  294. }
  295. async fn handle_message(&self, msg: DocumentCommand) {
  296. match msg {
  297. DocumentCommand::ApplyRevisions {
  298. user,
  299. repeated_revision,
  300. ret,
  301. } => {
  302. let result = self
  303. .synchronizer
  304. .sync_revisions(user, repeated_revision)
  305. .await
  306. .map_err(internal_error);
  307. let _ = ret.send(result);
  308. }
  309. DocumentCommand::Ping { user, rev_id, ret } => {
  310. let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
  311. let _ = ret.send(result);
  312. }
  313. DocumentCommand::Reset { repeated_revision, ret } => {
  314. let result = self.synchronizer.reset(repeated_revision).await.map_err(internal_error);
  315. let _ = ret.send(result);
  316. }
  317. }
  318. }
  319. }
  320. impl std::ops::Drop for DocumentCommandRunner {
  321. fn drop(&mut self) {
  322. tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id);
  323. }
  324. }