manager.rs 8.6 KB


  1. use crate::queue::TextBlockRevisionCompactor;
  2. use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService};
  3. use bytes::Bytes;
  4. use dashmap::DashMap;
  5. use flowy_database::ConnectionPool;
  6. use flowy_error::FlowyResult;
  7. use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
  8. use flowy_revision::{
  9. RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
  10. };
  11. use flowy_sync::entities::{
  12. revision::{md5, RepeatedRevision, Revision},
  13. text_block::{TextBlockDeltaPB, TextBlockIdPB},
  14. ws_data::ServerRevisionWSData,
  15. };
  16. use lib_infra::future::FutureResult;
  17. use std::{convert::TryInto, sync::Arc};
  18. pub trait TextBlockUser: Send + Sync {
  19. fn user_dir(&self) -> Result<String, FlowyError>;
  20. fn user_id(&self) -> Result<String, FlowyError>;
  21. fn token(&self) -> Result<String, FlowyError>;
  22. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  23. }
  24. pub struct TextBlockManager {
  25. cloud_service: Arc<dyn BlockCloudService>,
  26. rev_web_socket: Arc<dyn RevisionWebSocket>,
  27. editor_map: Arc<TextBlockEditorMap>,
  28. user: Arc<dyn TextBlockUser>,
  29. }
  30. impl TextBlockManager {
  31. pub fn new(
  32. cloud_service: Arc<dyn BlockCloudService>,
  33. text_block_user: Arc<dyn TextBlockUser>,
  34. rev_web_socket: Arc<dyn RevisionWebSocket>,
  35. ) -> Self {
  36. Self {
  37. cloud_service,
  38. rev_web_socket,
  39. editor_map: Arc::new(TextBlockEditorMap::new()),
  40. user: text_block_user,
  41. }
  42. }
  43. pub fn init(&self) -> FlowyResult<()> {
  44. listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
  45. Ok(())
  46. }
  47. #[tracing::instrument(level = "trace", skip(self, block_id), fields(block_id), err)]
  48. pub async fn open_block<T: AsRef<str>>(&self, block_id: T) -> Result<Arc<TextBlockEditor>, FlowyError> {
  49. let block_id = block_id.as_ref();
  50. tracing::Span::current().record("block_id", &block_id);
  51. self.get_block_editor(block_id).await
  52. }
  53. #[tracing::instrument(level = "trace", skip(self, block_id), fields(block_id), err)]
  54. pub fn close_block<T: AsRef<str>>(&self, block_id: T) -> Result<(), FlowyError> {
  55. let block_id = block_id.as_ref();
  56. tracing::Span::current().record("block_id", &block_id);
  57. self.editor_map.remove(block_id);
  58. Ok(())
  59. }
  60. #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
  61. pub fn delete_block<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
  62. let doc_id = doc_id.as_ref();
  63. tracing::Span::current().record("doc_id", &doc_id);
  64. self.editor_map.remove(doc_id);
  65. Ok(())
  66. }
  67. #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.block_id), err)]
  68. pub async fn receive_local_delta(&self, delta: TextBlockDeltaPB) -> Result<TextBlockDeltaPB, FlowyError> {
  69. let editor = self.get_block_editor(&delta.block_id).await?;
  70. let _ = editor.compose_local_delta(Bytes::from(delta.delta_str)).await?;
  71. let document_json = editor.delta_str().await?;
  72. Ok(TextBlockDeltaPB {
  73. block_id: delta.block_id.clone(),
  74. delta_str: document_json,
  75. })
  76. }
  77. pub async fn create_block<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
  78. let doc_id = doc_id.as_ref().to_owned();
  79. let db_pool = self.user.db_pool()?;
  80. // Maybe we could save the block to disk without creating the RevisionManager
  81. let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
  82. let _ = rev_manager.reset_object(revisions).await?;
  83. Ok(())
  84. }
  85. pub async fn receive_ws_data(&self, data: Bytes) {
  86. let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
  87. match result {
  88. Ok(data) => match self.editor_map.get(&data.object_id) {
  89. None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
  90. Some(editor) => match editor.receive_ws_data(data).await {
  91. Ok(_) => {}
  92. Err(e) => tracing::error!("{}", e),
  93. },
  94. },
  95. Err(e) => {
  96. tracing::error!("Document ws data parser failed: {:?}", e);
  97. }
  98. }
  99. }
  100. }
  101. impl TextBlockManager {
  102. async fn get_block_editor(&self, block_id: &str) -> FlowyResult<Arc<TextBlockEditor>> {
  103. match self.editor_map.get(block_id) {
  104. None => {
  105. let db_pool = self.user.db_pool()?;
  106. self.make_text_block_editor(block_id, db_pool).await
  107. }
  108. Some(editor) => Ok(editor),
  109. }
  110. }
  111. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  112. async fn make_text_block_editor(
  113. &self,
  114. block_id: &str,
  115. pool: Arc<ConnectionPool>,
  116. ) -> Result<Arc<TextBlockEditor>, FlowyError> {
  117. let user = self.user.clone();
  118. let token = self.user.token()?;
  119. let rev_manager = self.make_rev_manager(block_id, pool.clone())?;
  120. let cloud_service = Arc::new(TextBlockRevisionCloudService {
  121. token,
  122. server: self.cloud_service.clone(),
  123. });
  124. let doc_editor =
  125. TextBlockEditor::new(block_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
  126. self.editor_map.insert(block_id, &doc_editor);
  127. Ok(doc_editor)
  128. }
  129. fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
  130. let user_id = self.user.user_id()?;
  131. let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
  132. let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
  133. // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
  134. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
  135. let rev_compactor = TextBlockRevisionCompactor();
  136. Ok(RevisionManager::new(
  137. &user_id,
  138. doc_id,
  139. rev_persistence,
  140. rev_compactor,
  141. // history_persistence,
  142. snapshot_persistence,
  143. ))
  144. }
  145. }
  146. struct TextBlockRevisionCloudService {
  147. token: String,
  148. server: Arc<dyn BlockCloudService>,
  149. }
  150. impl RevisionCloudService for TextBlockRevisionCloudService {
  151. #[tracing::instrument(level = "trace", skip(self))]
  152. fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  153. let params: TextBlockIdPB = object_id.to_string().into();
  154. let server = self.server.clone();
  155. let token = self.token.clone();
  156. let user_id = user_id.to_string();
  157. FutureResult::new(async move {
  158. match server.read_block(&token, params).await? {
  159. None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
  160. Some(doc) => {
  161. let delta_data = Bytes::from(doc.text.clone());
  162. let doc_md5 = md5(&delta_data);
  163. let revision = Revision::new(
  164. &doc.block_id,
  165. doc.base_rev_id,
  166. doc.rev_id,
  167. delta_data,
  168. &user_id,
  169. doc_md5,
  170. );
  171. Ok(vec![revision])
  172. }
  173. }
  174. })
  175. }
  176. }
  177. pub struct TextBlockEditorMap {
  178. inner: DashMap<String, Arc<TextBlockEditor>>,
  179. }
  180. impl TextBlockEditorMap {
  181. fn new() -> Self {
  182. Self { inner: DashMap::new() }
  183. }
  184. pub(crate) fn insert(&self, block_id: &str, doc: &Arc<TextBlockEditor>) {
  185. if self.inner.contains_key(block_id) {
  186. log::warn!("Doc:{} already exists in cache", block_id);
  187. }
  188. self.inner.insert(block_id.to_string(), doc.clone());
  189. }
  190. pub(crate) fn get(&self, block_id: &str) -> Option<Arc<TextBlockEditor>> {
  191. Some(self.inner.get(block_id)?.clone())
  192. }
  193. pub(crate) fn remove(&self, block_id: &str) {
  194. if let Some(editor) = self.get(block_id) {
  195. editor.stop()
  196. }
  197. self.inner.remove(block_id);
  198. }
  199. }
  200. #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
  201. fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<TextBlockEditorMap>) {
  202. tokio::spawn(async move {
  203. let mut notify = web_socket.subscribe_state_changed().await;
  204. while let Ok(state) = notify.recv().await {
  205. handlers.inner.iter().for_each(|handler| {
  206. handler.receive_ws_state(&state);
  207. })
  208. }
  209. });
  210. }