manager.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionCompress};
  2. use crate::entities::{DocumentVersionPB, EditParams};
  3. use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionCompress};
  4. use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence};
  5. use crate::services::DocumentPersistence;
  6. use crate::{errors::FlowyError, DocumentCloudService};
  7. use bytes::Bytes;
  8. use flowy_database::ConnectionPool;
  9. use flowy_error::FlowyResult;
  10. use flowy_http_model::util::md5;
  11. use flowy_http_model::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
  12. use flowy_revision::{
  13. RevisionCloudService, RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
  14. SQLiteRevisionSnapshotPersistence,
  15. };
  16. use flowy_sync::client_document::initial_delta_document_content;
  17. use lib_infra::future::FutureResult;
  18. use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
  19. use lib_ws::WSConnectState;
  20. use std::any::Any;
  21. use std::{convert::TryInto, sync::Arc};
  22. use tokio::sync::RwLock;
  23. pub trait DocumentUser: Send + Sync {
  24. fn user_dir(&self) -> Result<String, FlowyError>;
  25. fn user_id(&self) -> Result<String, FlowyError>;
  26. fn token(&self) -> Result<String, FlowyError>;
  27. }
  28. pub trait DocumentDatabase: Send + Sync {
  29. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  30. }
  31. pub trait DocumentEditor: Send + Sync {
  32. /// Called when the document get closed
  33. fn close(&self);
  34. /// Exports the document content. The content is encoded in the corresponding
  35. /// editor data format.
  36. fn export(&self) -> FutureResult<String, FlowyError>;
  37. /// Duplicate the document inner data into String
  38. fn duplicate(&self) -> FutureResult<String, FlowyError>;
  39. fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError>;
  40. fn receive_ws_state(&self, state: &WSConnectState);
  41. /// Receives the local operations made by the user input. The operations are encoded
  42. /// in binary format.
  43. fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
  44. /// Returns the `Any` reference that can be used to downcast back to the original,
  45. /// concrete type.
  46. ///
  47. /// The indirection through `as_any` is because using `downcast_ref`
  48. /// on `Box<A>` *directly* only lets us downcast back to `&A` again. You can take a look at [this](https://stackoverflow.com/questions/33687447/how-to-get-a-reference-to-a-concrete-type-from-a-trait-object)
  49. /// for more information.
  50. ///
  51. ///
  52. fn as_any(&self) -> &dyn Any;
  53. }
  54. #[derive(Clone, Debug)]
  55. pub struct DocumentConfig {
  56. pub version: DocumentVersionPB,
  57. }
  58. impl std::default::Default for DocumentConfig {
  59. fn default() -> Self {
  60. Self {
  61. version: DocumentVersionPB::V1,
  62. }
  63. }
  64. }
  65. pub struct DocumentManager {
  66. cloud_service: Arc<dyn DocumentCloudService>,
  67. rev_web_socket: Arc<dyn RevisionWebSocket>,
  68. editor_map: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
  69. user: Arc<dyn DocumentUser>,
  70. persistence: Arc<DocumentPersistence>,
  71. #[allow(dead_code)]
  72. config: DocumentConfig,
  73. }
  74. impl DocumentManager {
  75. pub fn new(
  76. cloud_service: Arc<dyn DocumentCloudService>,
  77. document_user: Arc<dyn DocumentUser>,
  78. database: Arc<dyn DocumentDatabase>,
  79. rev_web_socket: Arc<dyn RevisionWebSocket>,
  80. config: DocumentConfig,
  81. ) -> Self {
  82. Self {
  83. cloud_service,
  84. rev_web_socket,
  85. editor_map: Arc::new(RwLock::new(RefCountHashMap::new())),
  86. user: document_user,
  87. persistence: Arc::new(DocumentPersistence::new(database)),
  88. config,
  89. }
  90. }
  91. /// Called immediately after the application launched with the user sign in/sign up.
  92. #[tracing::instrument(level = "trace", skip_all, err)]
  93. pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> {
  94. let _ = self.persistence.initialize(user_id)?;
  95. listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
  96. Ok(())
  97. }
  98. pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
  99. Ok(())
  100. }
  101. #[tracing::instrument(level = "trace", skip_all, fields(document_id), err)]
  102. pub async fn open_document_editor<T: AsRef<str>>(
  103. &self,
  104. document_id: T,
  105. ) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  106. let document_id = document_id.as_ref();
  107. tracing::Span::current().record("document_id", &document_id);
  108. self.init_document_editor(document_id).await
  109. }
  110. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  111. pub async fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
  112. let editor_id = editor_id.as_ref();
  113. tracing::Span::current().record("editor_id", &editor_id);
  114. self.editor_map.write().await.remove(editor_id);
  115. Ok(())
  116. }
  117. pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
  118. let editor = self.get_document_editor(&params.doc_id).await?;
  119. let _ = editor.compose_local_operations(Bytes::from(params.operations)).await?;
  120. Ok(())
  121. }
  122. pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
  123. let doc_id = doc_id.as_ref().to_owned();
  124. let db_pool = self.persistence.database.db_pool()?;
  125. // Maybe we could save the document to disk without creating the RevisionManager
  126. let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
  127. let _ = rev_manager.reset_object(revisions).await?;
  128. Ok(())
  129. }
  130. pub async fn receive_ws_data(&self, data: Bytes) {
  131. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  132. match result {
  133. Ok(data) => match self.editor_map.read().await.get(&data.object_id) {
  134. None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
  135. Some(handler) => match handler.0.receive_ws_data(data).await {
  136. Ok(_) => {}
  137. Err(e) => tracing::error!("{}", e),
  138. },
  139. },
  140. Err(e) => {
  141. tracing::error!("Document ws data parser failed: {:?}", e);
  142. }
  143. }
  144. }
  145. pub fn initial_document_content(&self) -> String {
  146. match self.config.version {
  147. DocumentVersionPB::V0 => initial_delta_document_content(),
  148. DocumentVersionPB::V1 => initial_document_content(),
  149. }
  150. }
  151. }
  152. impl DocumentManager {
  153. /// Returns the `DocumentEditor`
  154. ///
  155. /// # Arguments
  156. ///
  157. /// * `doc_id`: the id of the document
  158. ///
  159. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  160. ///
  161. async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
  162. match self.editor_map.read().await.get(doc_id) {
  163. None => {
  164. //
  165. tracing::warn!("Should call init_document_editor first");
  166. self.init_document_editor(doc_id).await
  167. }
  168. Some(handler) => Ok(handler.0.clone()),
  169. }
  170. }
  171. /// Initializes a document editor with the doc_id
  172. ///
  173. /// # Arguments
  174. ///
  175. /// * `doc_id`: the id of the document
  176. /// * `pool`: sqlite connection pool
  177. ///
  178. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  179. ///
  180. #[tracing::instrument(level = "trace", skip(self), err)]
  181. pub async fn init_document_editor(&self, doc_id: &str) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  182. let pool = self.persistence.database.db_pool()?;
  183. let user = self.user.clone();
  184. let token = self.user.token()?;
  185. let cloud_service = Arc::new(DocumentRevisionCloudService {
  186. token,
  187. server: self.cloud_service.clone(),
  188. });
  189. match self.config.version {
  190. DocumentVersionPB::V0 => {
  191. let rev_manager = self.make_delta_document_rev_manager(doc_id, pool.clone())?;
  192. let editor: Arc<dyn DocumentEditor> = Arc::new(
  193. DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service)
  194. .await?,
  195. );
  196. self.editor_map
  197. .write()
  198. .await
  199. .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
  200. Ok(editor)
  201. }
  202. DocumentVersionPB::V1 => {
  203. let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
  204. let editor: Arc<dyn DocumentEditor> =
  205. Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?);
  206. self.editor_map
  207. .write()
  208. .await
  209. .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
  210. Ok(editor)
  211. }
  212. }
  213. }
  214. fn make_rev_manager(
  215. &self,
  216. doc_id: &str,
  217. pool: Arc<ConnectionPool>,
  218. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  219. match self.config.version {
  220. DocumentVersionPB::V0 => self.make_delta_document_rev_manager(doc_id, pool),
  221. DocumentVersionPB::V1 => self.make_document_rev_manager(doc_id, pool),
  222. }
  223. }
  224. fn make_document_rev_manager(
  225. &self,
  226. doc_id: &str,
  227. pool: Arc<ConnectionPool>,
  228. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  229. let user_id = self.user.user_id()?;
  230. let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
  231. let configuration = RevisionPersistenceConfiguration::new(100, true);
  232. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
  233. // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
  234. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
  235. Ok(RevisionManager::new(
  236. &user_id,
  237. doc_id,
  238. rev_persistence,
  239. DocumentRevisionCompress(),
  240. // history_persistence,
  241. snapshot_persistence,
  242. ))
  243. }
  244. fn make_delta_document_rev_manager(
  245. &self,
  246. doc_id: &str,
  247. pool: Arc<ConnectionPool>,
  248. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  249. let user_id = self.user.user_id()?;
  250. let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone());
  251. let configuration = RevisionPersistenceConfiguration::new(100, true);
  252. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
  253. // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
  254. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
  255. Ok(RevisionManager::new(
  256. &user_id,
  257. doc_id,
  258. rev_persistence,
  259. DeltaDocumentRevisionCompress(),
  260. // history_persistence,
  261. snapshot_persistence,
  262. ))
  263. }
  264. }
  265. struct DocumentRevisionCloudService {
  266. token: String,
  267. server: Arc<dyn DocumentCloudService>,
  268. }
  269. impl RevisionCloudService for DocumentRevisionCloudService {
  270. #[tracing::instrument(level = "trace", skip(self))]
  271. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  272. let params: DocumentIdPB = object_id.to_string().into();
  273. let server = self.server.clone();
  274. let token = self.token.clone();
  275. FutureResult::new(async move {
  276. match server.fetch_document(&token, params).await? {
  277. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  278. Some(payload) => {
  279. let bytes = Bytes::from(payload.data.clone());
  280. let doc_md5 = md5(&bytes);
  281. let revision = Revision::new(&payload.doc_id, payload.base_rev_id, payload.rev_id, bytes, doc_md5);
  282. Ok(vec![revision])
  283. }
  284. }
  285. })
  286. }
  287. }
  288. #[derive(Clone)]
  289. struct RefCountDocumentHandler(Arc<dyn DocumentEditor>);
  290. impl RefCountValue for RefCountDocumentHandler {
  291. fn did_remove(&self) {
  292. self.0.close();
  293. }
  294. }
  295. impl std::ops::Deref for RefCountDocumentHandler {
  296. type Target = Arc<dyn DocumentEditor>;
  297. fn deref(&self) -> &Self::Target {
  298. &self.0
  299. }
  300. }
  301. #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
  302. fn listen_ws_state_changed(
  303. web_socket: Arc<dyn RevisionWebSocket>,
  304. handlers: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
  305. ) {
  306. tokio::spawn(async move {
  307. let mut notify = web_socket.subscribe_state_changed().await;
  308. while let Ok(state) = notify.recv().await {
  309. handlers.read().await.values().iter().for_each(|handler| {
  310. handler.receive_ws_state(&state);
  311. })
  312. }
  313. });
  314. }