manager.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. use crate::editor::{initial_document_content, AppFlowyDocumentEditor};
  2. use crate::entities::EditParams;
  3. use crate::old_editor::editor::{DocumentRevisionCompress, OldDocumentEditor};
  4. use crate::{errors::FlowyError, DocumentCloudService};
  5. use bytes::Bytes;
  6. use dashmap::DashMap;
  7. use flowy_database::ConnectionPool;
  8. use flowy_error::FlowyResult;
  9. use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
  10. use flowy_revision::{
  11. RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
  12. };
  13. use flowy_sync::client_document::initial_old_document_content;
  14. use flowy_sync::entities::{
  15. document::{DocumentIdPB, DocumentOperationsPB},
  16. revision::{md5, RepeatedRevision, Revision},
  17. ws_data::ServerRevisionWSData,
  18. };
  19. use lib_infra::future::FutureResult;
  20. use lib_ws::WSConnectState;
  21. use std::any::Any;
  22. use std::{convert::TryInto, sync::Arc};
  23. pub trait DocumentUser: Send + Sync {
  24. fn user_dir(&self) -> Result<String, FlowyError>;
  25. fn user_id(&self) -> Result<String, FlowyError>;
  26. fn token(&self) -> Result<String, FlowyError>;
  27. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  28. }
  29. pub trait DocumentEditor: Send + Sync {
  30. fn get_operations_str(&self) -> FutureResult<String, FlowyError>;
  31. fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
  32. fn close(&self);
  33. fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError>;
  34. fn receive_ws_state(&self, state: &WSConnectState);
  35. /// Returns the `Any` reference that can be used to downcast back to the original,
  36. /// concrete type.
  37. ///
  38. /// The indirection through `as_any` is because using `downcast_ref`
  39. /// on `Box<A>` *directly* only lets us downcast back to `&A` again. You can take a look at [this](https://stackoverflow.com/questions/33687447/how-to-get-a-reference-to-a-concrete-type-from-a-trait-object)
  40. /// for more information.
  41. ///
  42. ///
  43. fn as_any(&self) -> &dyn Any;
  44. }
  45. #[derive(Clone, Debug)]
  46. pub struct DocumentConfig {
  47. pub use_new_editor: bool,
  48. }
  49. pub struct DocumentManager {
  50. cloud_service: Arc<dyn DocumentCloudService>,
  51. rev_web_socket: Arc<dyn RevisionWebSocket>,
  52. editor_map: Arc<DocumentEditorMap>,
  53. user: Arc<dyn DocumentUser>,
  54. config: DocumentConfig,
  55. }
  56. impl DocumentManager {
  57. pub fn new(
  58. cloud_service: Arc<dyn DocumentCloudService>,
  59. document_user: Arc<dyn DocumentUser>,
  60. rev_web_socket: Arc<dyn RevisionWebSocket>,
  61. config: DocumentConfig,
  62. ) -> Self {
  63. Self {
  64. cloud_service,
  65. rev_web_socket,
  66. editor_map: Arc::new(DocumentEditorMap::new()),
  67. user: document_user,
  68. config,
  69. }
  70. }
  71. pub fn init(&self) -> FlowyResult<()> {
  72. listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
  73. Ok(())
  74. }
  75. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  76. pub async fn open_document_editor<T: AsRef<str>>(
  77. &self,
  78. editor_id: T,
  79. ) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  80. let editor_id = editor_id.as_ref();
  81. tracing::Span::current().record("editor_id", &editor_id);
  82. self.init_document_editor(editor_id).await
  83. }
  84. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  85. pub fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
  86. let editor_id = editor_id.as_ref();
  87. tracing::Span::current().record("editor_id", &editor_id);
  88. self.editor_map.remove(editor_id);
  89. Ok(())
  90. }
  91. #[tracing::instrument(level = "debug", skip(self, payload), err)]
  92. pub async fn receive_local_operations(
  93. &self,
  94. payload: DocumentOperationsPB,
  95. ) -> Result<DocumentOperationsPB, FlowyError> {
  96. let editor = self.get_document_editor(&payload.doc_id).await?;
  97. let _ = editor
  98. .compose_local_operations(Bytes::from(payload.operations_str))
  99. .await?;
  100. let operations_str = editor.get_operations_str().await?;
  101. Ok(DocumentOperationsPB {
  102. doc_id: payload.doc_id.clone(),
  103. operations_str,
  104. })
  105. }
  106. pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
  107. let editor = self.get_document_editor(&params.doc_id).await?;
  108. let _ = editor.compose_local_operations(Bytes::from(params.operations)).await?;
  109. Ok(())
  110. }
  111. pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
  112. let doc_id = doc_id.as_ref().to_owned();
  113. let db_pool = self.user.db_pool()?;
  114. // Maybe we could save the document to disk without creating the RevisionManager
  115. let rev_manager = self.make_document_rev_manager(&doc_id, db_pool)?;
  116. let _ = rev_manager.reset_object(revisions).await?;
  117. Ok(())
  118. }
  119. pub async fn receive_ws_data(&self, data: Bytes) {
  120. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  121. match result {
  122. Ok(data) => match self.editor_map.get(&data.object_id) {
  123. None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
  124. Some(editor) => match editor.receive_ws_data(data).await {
  125. Ok(_) => {}
  126. Err(e) => tracing::error!("{}", e),
  127. },
  128. },
  129. Err(e) => {
  130. tracing::error!("Document ws data parser failed: {:?}", e);
  131. }
  132. }
  133. }
  134. pub fn initial_document_content(&self) -> String {
  135. if self.config.use_new_editor {
  136. initial_document_content()
  137. } else {
  138. initial_old_document_content()
  139. }
  140. }
  141. }
  142. impl DocumentManager {
  143. /// Returns the `DocumentEditor`
  144. ///
  145. /// # Arguments
  146. ///
  147. /// * `doc_id`: the id of the document
  148. ///
  149. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  150. ///
  151. async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
  152. match self.editor_map.get(doc_id) {
  153. None => self.init_document_editor(doc_id).await,
  154. Some(editor) => Ok(editor),
  155. }
  156. }
  157. /// Initializes a document editor with the doc_id
  158. ///
  159. /// # Arguments
  160. ///
  161. /// * `doc_id`: the id of the document
  162. /// * `pool`: sqlite connection pool
  163. ///
  164. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  165. ///
  166. #[tracing::instrument(level = "trace", skip(self), err)]
  167. pub async fn init_document_editor(&self, doc_id: &str) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  168. let pool = self.user.db_pool()?;
  169. let user = self.user.clone();
  170. let token = self.user.token()?;
  171. let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
  172. let cloud_service = Arc::new(DocumentRevisionCloudService {
  173. token,
  174. server: self.cloud_service.clone(),
  175. });
  176. let editor: Arc<dyn DocumentEditor> = if self.config.use_new_editor {
  177. let editor = AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?;
  178. Arc::new(editor)
  179. } else {
  180. let editor =
  181. OldDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
  182. Arc::new(editor)
  183. };
  184. self.editor_map.insert(doc_id, editor.clone());
  185. Ok(editor)
  186. }
  187. fn make_document_rev_manager(
  188. &self,
  189. doc_id: &str,
  190. pool: Arc<ConnectionPool>,
  191. ) -> Result<RevisionManager, FlowyError> {
  192. let user_id = self.user.user_id()?;
  193. let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
  194. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
  195. // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
  196. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
  197. let rev_compactor = DocumentRevisionCompress();
  198. Ok(RevisionManager::new(
  199. &user_id,
  200. doc_id,
  201. rev_persistence,
  202. rev_compactor,
  203. // history_persistence,
  204. snapshot_persistence,
  205. ))
  206. }
  207. }
  208. struct DocumentRevisionCloudService {
  209. token: String,
  210. server: Arc<dyn DocumentCloudService>,
  211. }
  212. impl RevisionCloudService for DocumentRevisionCloudService {
  213. #[tracing::instrument(level = "trace", skip(self))]
  214. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  215. let params: DocumentIdPB = object_id.to_string().into();
  216. let server = self.server.clone();
  217. let token = self.token.clone();
  218. let user_id = user_id.to_string();
  219. FutureResult::new(async move {
  220. match server.fetch_document(&token, params).await? {
  221. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  222. Some(payload) => {
  223. let bytes = Bytes::from(payload.content.clone());
  224. let doc_md5 = md5(&bytes);
  225. let revision = Revision::new(
  226. &payload.doc_id,
  227. payload.base_rev_id,
  228. payload.rev_id,
  229. bytes,
  230. &user_id,
  231. doc_md5,
  232. );
  233. Ok(vec![revision])
  234. }
  235. }
  236. })
  237. }
  238. }
  239. pub struct DocumentEditorMap {
  240. inner: DashMap<String, Arc<dyn DocumentEditor>>,
  241. }
  242. impl DocumentEditorMap {
  243. fn new() -> Self {
  244. Self { inner: DashMap::new() }
  245. }
  246. pub(crate) fn insert(&self, editor_id: &str, editor: Arc<dyn DocumentEditor>) {
  247. if self.inner.contains_key(editor_id) {
  248. log::warn!("Editor:{} already exists in cache", editor_id);
  249. }
  250. self.inner.insert(editor_id.to_string(), editor);
  251. }
  252. pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<dyn DocumentEditor>> {
  253. Some(self.inner.get(editor_id)?.clone())
  254. }
  255. pub(crate) fn remove(&self, editor_id: &str) {
  256. if let Some(editor) = self.get(editor_id) {
  257. editor.close()
  258. }
  259. self.inner.remove(editor_id);
  260. }
  261. }
  262. #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
  263. fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<DocumentEditorMap>) {
  264. tokio::spawn(async move {
  265. let mut notify = web_socket.subscribe_state_changed().await;
  266. while let Ok(state) = notify.recv().await {
  267. handlers.inner.iter().for_each(|handler| {
  268. handler.receive_ws_state(&state);
  269. })
  270. }
  271. });
  272. }