manager.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionMergeable};
  2. use crate::entities::{DocumentVersionPB, EditParams};
  3. use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionMergeable};
  4. use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence};
  5. use crate::services::DocumentPersistence;
  6. use crate::{errors::FlowyError, DocumentCloudService};
  7. use bytes::Bytes;
  8. use flowy_database::ConnectionPool;
  9. use flowy_error::FlowyResult;
  10. use flowy_http_model::util::md5;
  11. use flowy_http_model::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
  12. use flowy_revision::{
  13. PhantomSnapshotPersistence, RevisionCloudService, RevisionManager, RevisionPersistence,
  14. RevisionPersistenceConfiguration, RevisionWebSocket,
  15. };
  16. use flowy_sync::client_document::initial_delta_document_content;
  17. use lib_infra::async_trait::async_trait;
  18. use lib_infra::future::FutureResult;
  19. use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
  20. use lib_ws::WSConnectState;
  21. use std::any::Any;
  22. use std::{convert::TryInto, sync::Arc};
  23. use tokio::sync::RwLock;
  24. pub trait DocumentUser: Send + Sync {
  25. fn user_dir(&self) -> Result<String, FlowyError>;
  26. fn user_id(&self) -> Result<String, FlowyError>;
  27. fn token(&self) -> Result<String, FlowyError>;
  28. }
  29. pub trait DocumentDatabase: Send + Sync {
  30. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  31. }
  32. #[async_trait]
  33. pub trait DocumentEditor: Send + Sync {
  34. /// Called when the document get closed
  35. async fn close(&self);
  36. /// Exports the document content. The content is encoded in the corresponding
  37. /// editor data format.
  38. fn export(&self) -> FutureResult<String, FlowyError>;
  39. /// Duplicate the document inner data into String
  40. fn duplicate(&self) -> FutureResult<String, FlowyError>;
  41. fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError>;
  42. fn receive_ws_state(&self, state: &WSConnectState);
  43. /// Receives the local operations made by the user input. The operations are encoded
  44. /// in binary format.
  45. fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
  46. /// Returns the `Any` reference that can be used to downcast back to the original,
  47. /// concrete type.
  48. ///
  49. /// The indirection through `as_any` is because using `downcast_ref`
  50. /// on `Box<A>` *directly* only lets us downcast back to `&A` again. You can take a look at [this](https://stackoverflow.com/questions/33687447/how-to-get-a-reference-to-a-concrete-type-from-a-trait-object)
  51. /// for more information.
  52. ///
  53. ///
  54. fn as_any(&self) -> &dyn Any;
  55. }
  56. #[derive(Clone, Debug)]
  57. pub struct DocumentConfig {
  58. pub version: DocumentVersionPB,
  59. }
  60. impl std::default::Default for DocumentConfig {
  61. fn default() -> Self {
  62. Self {
  63. version: DocumentVersionPB::V1,
  64. }
  65. }
  66. }
  67. pub struct DocumentManager {
  68. cloud_service: Arc<dyn DocumentCloudService>,
  69. rev_web_socket: Arc<dyn RevisionWebSocket>,
  70. editor_map: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
  71. user: Arc<dyn DocumentUser>,
  72. persistence: Arc<DocumentPersistence>,
  73. #[allow(dead_code)]
  74. config: DocumentConfig,
  75. }
  76. impl DocumentManager {
  77. pub fn new(
  78. cloud_service: Arc<dyn DocumentCloudService>,
  79. document_user: Arc<dyn DocumentUser>,
  80. database: Arc<dyn DocumentDatabase>,
  81. rev_web_socket: Arc<dyn RevisionWebSocket>,
  82. config: DocumentConfig,
  83. ) -> Self {
  84. Self {
  85. cloud_service,
  86. rev_web_socket,
  87. editor_map: Arc::new(RwLock::new(RefCountHashMap::new())),
  88. user: document_user,
  89. persistence: Arc::new(DocumentPersistence::new(database)),
  90. config,
  91. }
  92. }
  93. /// Called immediately after the application launched with the user sign in/sign up.
  94. #[tracing::instrument(level = "trace", skip_all, err)]
  95. pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> {
  96. let _ = self.persistence.initialize(user_id)?;
  97. listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
  98. Ok(())
  99. }
  100. pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
  101. Ok(())
  102. }
  103. #[tracing::instrument(level = "trace", skip_all, fields(document_id), err)]
  104. pub async fn open_document_editor<T: AsRef<str>>(
  105. &self,
  106. document_id: T,
  107. ) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  108. let document_id = document_id.as_ref();
  109. tracing::Span::current().record("document_id", &document_id);
  110. self.init_document_editor(document_id).await
  111. }
  112. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  113. pub async fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
  114. let editor_id = editor_id.as_ref();
  115. tracing::Span::current().record("editor_id", &editor_id);
  116. self.editor_map.write().await.remove(editor_id).await;
  117. Ok(())
  118. }
  119. pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
  120. let editor = self.get_document_editor(&params.doc_id).await?;
  121. let _ = editor.compose_local_operations(Bytes::from(params.operations)).await?;
  122. Ok(())
  123. }
  124. pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
  125. let doc_id = doc_id.as_ref().to_owned();
  126. let db_pool = self.persistence.database.db_pool()?;
  127. // Maybe we could save the document to disk without creating the RevisionManager
  128. let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
  129. let _ = rev_manager.reset_object(revisions).await?;
  130. Ok(())
  131. }
  132. pub async fn receive_ws_data(&self, data: Bytes) {
  133. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  134. match result {
  135. Ok(data) => match self.editor_map.read().await.get(&data.object_id) {
  136. None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
  137. Some(handler) => match handler.0.receive_ws_data(data).await {
  138. Ok(_) => {}
  139. Err(e) => tracing::error!("{}", e),
  140. },
  141. },
  142. Err(e) => {
  143. tracing::error!("Document ws data parser failed: {:?}", e);
  144. }
  145. }
  146. }
  147. pub fn initial_document_content(&self) -> String {
  148. match self.config.version {
  149. DocumentVersionPB::V0 => initial_delta_document_content(),
  150. DocumentVersionPB::V1 => initial_document_content(),
  151. }
  152. }
  153. }
  154. impl DocumentManager {
  155. /// Returns the `DocumentEditor`
  156. ///
  157. /// # Arguments
  158. ///
  159. /// * `doc_id`: the id of the document
  160. ///
  161. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  162. ///
  163. async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
  164. match self.editor_map.read().await.get(doc_id) {
  165. None => {
  166. //
  167. tracing::warn!("Should call init_document_editor first");
  168. self.init_document_editor(doc_id).await
  169. }
  170. Some(handler) => Ok(handler.0.clone()),
  171. }
  172. }
  173. /// Initializes a document editor with the doc_id
  174. ///
  175. /// # Arguments
  176. ///
  177. /// * `doc_id`: the id of the document
  178. /// * `pool`: sqlite connection pool
  179. ///
  180. /// returns: Result<Arc<DocumentEditor>, FlowyError>
  181. ///
  182. #[tracing::instrument(level = "trace", skip(self), err)]
  183. pub async fn init_document_editor(&self, doc_id: &str) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
  184. let pool = self.persistence.database.db_pool()?;
  185. let user = self.user.clone();
  186. let token = self.user.token()?;
  187. let cloud_service = Arc::new(DocumentRevisionCloudService {
  188. token,
  189. server: self.cloud_service.clone(),
  190. });
  191. match self.config.version {
  192. DocumentVersionPB::V0 => {
  193. let rev_manager = self.make_delta_document_rev_manager(doc_id, pool.clone())?;
  194. let editor: Arc<dyn DocumentEditor> = Arc::new(
  195. DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service)
  196. .await?,
  197. );
  198. self.editor_map
  199. .write()
  200. .await
  201. .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
  202. Ok(editor)
  203. }
  204. DocumentVersionPB::V1 => {
  205. let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
  206. let editor: Arc<dyn DocumentEditor> =
  207. Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?);
  208. self.editor_map
  209. .write()
  210. .await
  211. .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
  212. Ok(editor)
  213. }
  214. }
  215. }
  216. fn make_rev_manager(
  217. &self,
  218. doc_id: &str,
  219. pool: Arc<ConnectionPool>,
  220. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  221. match self.config.version {
  222. DocumentVersionPB::V0 => self.make_delta_document_rev_manager(doc_id, pool),
  223. DocumentVersionPB::V1 => self.make_document_rev_manager(doc_id, pool),
  224. }
  225. }
  226. fn make_document_rev_manager(
  227. &self,
  228. doc_id: &str,
  229. pool: Arc<ConnectionPool>,
  230. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  231. let user_id = self.user.user_id()?;
  232. let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool);
  233. let configuration = RevisionPersistenceConfiguration::new(100, true);
  234. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
  235. Ok(RevisionManager::new(
  236. &user_id,
  237. doc_id,
  238. rev_persistence,
  239. DocumentRevisionMergeable(),
  240. PhantomSnapshotPersistence(),
  241. ))
  242. }
  243. fn make_delta_document_rev_manager(
  244. &self,
  245. doc_id: &str,
  246. pool: Arc<ConnectionPool>,
  247. ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
  248. let user_id = self.user.user_id()?;
  249. let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool);
  250. let configuration = RevisionPersistenceConfiguration::new(100, true);
  251. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
  252. Ok(RevisionManager::new(
  253. &user_id,
  254. doc_id,
  255. rev_persistence,
  256. DeltaDocumentRevisionMergeable(),
  257. PhantomSnapshotPersistence(),
  258. ))
  259. }
  260. }
  261. struct DocumentRevisionCloudService {
  262. token: String,
  263. server: Arc<dyn DocumentCloudService>,
  264. }
  265. impl RevisionCloudService for DocumentRevisionCloudService {
  266. #[tracing::instrument(level = "trace", skip(self))]
  267. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  268. let params: DocumentIdPB = object_id.to_string().into();
  269. let server = self.server.clone();
  270. let token = self.token.clone();
  271. FutureResult::new(async move {
  272. match server.fetch_document(&token, params).await? {
  273. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  274. Some(payload) => {
  275. let bytes = Bytes::from(payload.data.clone());
  276. let doc_md5 = md5(&bytes);
  277. let revision = Revision::new(&payload.doc_id, payload.base_rev_id, payload.rev_id, bytes, doc_md5);
  278. Ok(vec![revision])
  279. }
  280. }
  281. })
  282. }
  283. }
  284. #[derive(Clone)]
  285. struct RefCountDocumentHandler(Arc<dyn DocumentEditor>);
  286. #[async_trait]
  287. impl RefCountValue for RefCountDocumentHandler {
  288. async fn did_remove(&self) {
  289. self.0.close().await;
  290. }
  291. }
  292. impl std::ops::Deref for RefCountDocumentHandler {
  293. type Target = Arc<dyn DocumentEditor>;
  294. fn deref(&self) -> &Self::Target {
  295. &self.0
  296. }
  297. }
  298. #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
  299. fn listen_ws_state_changed(
  300. web_socket: Arc<dyn RevisionWebSocket>,
  301. handlers: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
  302. ) {
  303. tokio::spawn(async move {
  304. let mut notify = web_socket.subscribe_state_changed().await;
  305. while let Ok(state) = notify.recv().await {
  306. handlers.read().await.values().iter().for_each(|handler| {
  307. handler.receive_ws_state(&state);
  308. })
  309. }
  310. });
  311. }