manager.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. use crate::entities::EditParams;
  2. use crate::queue::TextBlockRevisionCompactor;
  3. use crate::{editor::TextBlockEditor, errors::FlowyError, TextEditorCloudService};
  4. use bytes::Bytes;
  5. use dashmap::DashMap;
  6. use flowy_database::ConnectionPool;
  7. use flowy_error::FlowyResult;
  8. use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
  9. use flowy_revision::{
  10. RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
  11. };
  12. use flowy_sync::entities::{
  13. revision::{md5, RepeatedRevision, Revision},
  14. text_block::{TextBlockDeltaPB, TextBlockIdPB},
  15. ws_data::ServerRevisionWSData,
  16. };
  17. use lib_infra::future::FutureResult;
  18. use std::{convert::TryInto, sync::Arc};
  19. pub trait TextEditorUser: Send + Sync {
  20. fn user_dir(&self) -> Result<String, FlowyError>;
  21. fn user_id(&self) -> Result<String, FlowyError>;
  22. fn token(&self) -> Result<String, FlowyError>;
  23. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  24. }
  25. pub struct TextEditorManager {
  26. cloud_service: Arc<dyn TextEditorCloudService>,
  27. rev_web_socket: Arc<dyn RevisionWebSocket>,
  28. editor_map: Arc<TextEditorMap>,
  29. user: Arc<dyn TextEditorUser>,
  30. }
  31. impl TextEditorManager {
  32. pub fn new(
  33. cloud_service: Arc<dyn TextEditorCloudService>,
  34. text_block_user: Arc<dyn TextEditorUser>,
  35. rev_web_socket: Arc<dyn RevisionWebSocket>,
  36. ) -> Self {
  37. Self {
  38. cloud_service,
  39. rev_web_socket,
  40. editor_map: Arc::new(TextEditorMap::new()),
  41. user: text_block_user,
  42. }
  43. }
  44. pub fn init(&self) -> FlowyResult<()> {
  45. listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
  46. Ok(())
  47. }
  48. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  49. pub async fn open_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<Arc<TextBlockEditor>, FlowyError> {
  50. let editor_id = editor_id.as_ref();
  51. tracing::Span::current().record("editor_id", &editor_id);
  52. self.get_text_editor(editor_id).await
  53. }
  54. #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
  55. pub fn close_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
  56. let editor_id = editor_id.as_ref();
  57. tracing::Span::current().record("editor_id", &editor_id);
  58. self.editor_map.remove(editor_id);
  59. Ok(())
  60. }
  61. #[tracing::instrument(level = "debug", skip(self, delta), err)]
  62. pub async fn receive_local_delta(&self, delta: TextBlockDeltaPB) -> Result<TextBlockDeltaPB, FlowyError> {
  63. let editor = self.get_text_editor(&delta.text_block_id).await?;
  64. let _ = editor.compose_local_delta(Bytes::from(delta.delta_str)).await?;
  65. let delta_str = editor.delta_str().await?;
  66. Ok(TextBlockDeltaPB {
  67. text_block_id: delta.text_block_id.clone(),
  68. delta_str,
  69. })
  70. }
  71. pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
  72. let editor = self.get_text_editor(&params.text_block_id).await?;
  73. let _ = editor.compose_local_delta(Bytes::from(params.delta)).await?;
  74. Ok(())
  75. }
  76. pub async fn create_text_block<T: AsRef<str>>(
  77. &self,
  78. text_block_id: T,
  79. revisions: RepeatedRevision,
  80. ) -> FlowyResult<()> {
  81. let doc_id = text_block_id.as_ref().to_owned();
  82. let db_pool = self.user.db_pool()?;
  83. // Maybe we could save the block to disk without creating the RevisionManager
  84. let rev_manager = self.make_text_block_rev_manager(&doc_id, db_pool)?;
  85. let _ = rev_manager.reset_object(revisions).await?;
  86. Ok(())
  87. }
  88. pub async fn receive_ws_data(&self, data: Bytes) {
  89. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  90. match result {
  91. Ok(data) => match self.editor_map.get(&data.object_id) {
  92. None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
  93. Some(editor) => match editor.receive_ws_data(data).await {
  94. Ok(_) => {}
  95. Err(e) => tracing::error!("{}", e),
  96. },
  97. },
  98. Err(e) => {
  99. tracing::error!("Document ws data parser failed: {:?}", e);
  100. }
  101. }
  102. }
  103. }
  104. impl TextEditorManager {
  105. async fn get_text_editor(&self, block_id: &str) -> FlowyResult<Arc<TextBlockEditor>> {
  106. match self.editor_map.get(block_id) {
  107. None => {
  108. let db_pool = self.user.db_pool()?;
  109. self.make_text_editor(block_id, db_pool).await
  110. }
  111. Some(editor) => Ok(editor),
  112. }
  113. }
  114. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  115. async fn make_text_editor(
  116. &self,
  117. block_id: &str,
  118. pool: Arc<ConnectionPool>,
  119. ) -> Result<Arc<TextBlockEditor>, FlowyError> {
  120. let user = self.user.clone();
  121. let token = self.user.token()?;
  122. let rev_manager = self.make_text_block_rev_manager(block_id, pool.clone())?;
  123. let cloud_service = Arc::new(TextBlockRevisionCloudService {
  124. token,
  125. server: self.cloud_service.clone(),
  126. });
  127. let doc_editor =
  128. TextBlockEditor::new(block_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
  129. self.editor_map.insert(block_id, &doc_editor);
  130. Ok(doc_editor)
  131. }
  132. fn make_text_block_rev_manager(
  133. &self,
  134. doc_id: &str,
  135. pool: Arc<ConnectionPool>,
  136. ) -> Result<RevisionManager, FlowyError> {
  137. let user_id = self.user.user_id()?;
  138. let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
  139. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
  140. // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
  141. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
  142. let rev_compactor = TextBlockRevisionCompactor();
  143. Ok(RevisionManager::new(
  144. &user_id,
  145. doc_id,
  146. rev_persistence,
  147. rev_compactor,
  148. // history_persistence,
  149. snapshot_persistence,
  150. ))
  151. }
  152. }
  153. struct TextBlockRevisionCloudService {
  154. token: String,
  155. server: Arc<dyn TextEditorCloudService>,
  156. }
  157. impl RevisionCloudService for TextBlockRevisionCloudService {
  158. #[tracing::instrument(level = "trace", skip(self))]
  159. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  160. let params: TextBlockIdPB = object_id.to_string().into();
  161. let server = self.server.clone();
  162. let token = self.token.clone();
  163. let user_id = user_id.to_string();
  164. FutureResult::new(async move {
  165. match server.read_text_block(&token, params).await? {
  166. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  167. Some(doc) => {
  168. let delta_data = Bytes::from(doc.text.clone());
  169. let doc_md5 = md5(&delta_data);
  170. let revision = Revision::new(
  171. &doc.block_id,
  172. doc.base_rev_id,
  173. doc.rev_id,
  174. delta_data,
  175. &user_id,
  176. doc_md5,
  177. );
  178. Ok(vec![revision])
  179. }
  180. }
  181. })
  182. }
  183. }
  184. pub struct TextEditorMap {
  185. inner: DashMap<String, Arc<TextBlockEditor>>,
  186. }
  187. impl TextEditorMap {
  188. fn new() -> Self {
  189. Self { inner: DashMap::new() }
  190. }
  191. pub(crate) fn insert(&self, editor_id: &str, doc: &Arc<TextBlockEditor>) {
  192. if self.inner.contains_key(editor_id) {
  193. log::warn!("Doc:{} already exists in cache", editor_id);
  194. }
  195. self.inner.insert(editor_id.to_string(), doc.clone());
  196. }
  197. pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<TextBlockEditor>> {
  198. Some(self.inner.get(editor_id)?.clone())
  199. }
  200. pub(crate) fn remove(&self, editor_id: &str) {
  201. if let Some(editor) = self.get(editor_id) {
  202. editor.stop()
  203. }
  204. self.inner.remove(editor_id);
  205. }
  206. }
  207. #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
  208. fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<TextEditorMap>) {
  209. tokio::spawn(async move {
  210. let mut notify = web_socket.subscribe_state_changed().await;
  211. while let Ok(state) = notify.recv().await {
  212. handlers.inner.iter().for_each(|handler| {
  213. handler.receive_ws_state(&state);
  214. })
  215. }
  216. });
  217. }