folder_deps.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. use bytes::Bytes;
  2. use flowy_database::ConnectionPool;
  3. use flowy_folder::entities::{ViewDataTypePB, ViewLayoutTypePB};
  4. use flowy_folder::manager::{ViewDataProcessor, ViewDataProcessorMap};
  5. use flowy_folder::{
  6. errors::{internal_error, FlowyError},
  7. event_map::{FolderCouldServiceV1, WorkspaceDatabase, WorkspaceUser},
  8. manager::FolderManager,
  9. };
  10. use flowy_grid::manager::{make_grid_view_data, GridManager};
  11. use flowy_grid::util::{make_default_board, make_default_grid};
  12. use flowy_grid_data_model::revision::BuildGridContext;
  13. use flowy_net::ClientServerConfiguration;
  14. use flowy_net::{
  15. http_server::folder::FolderHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
  16. };
  17. use flowy_revision::{RevisionWebSocket, WSStateReceiver};
  18. use flowy_sync::client_document::default::initial_quill_delta_string;
  19. use flowy_sync::entities::revision::{RepeatedRevision, Revision};
  20. use flowy_sync::entities::ws_data::ClientRevisionWSData;
  21. use flowy_text_block::TextBlockManager;
  22. use flowy_user::services::UserSession;
  23. use futures_core::future::BoxFuture;
  24. use lib_infra::future::{BoxResultFuture, FutureResult};
  25. use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage};
  26. use std::collections::HashMap;
  27. use std::convert::TryFrom;
  28. use std::{convert::TryInto, sync::Arc};
  29. pub struct FolderDepsResolver();
  30. impl FolderDepsResolver {
  31. pub async fn resolve(
  32. local_server: Option<Arc<LocalServer>>,
  33. user_session: Arc<UserSession>,
  34. server_config: &ClientServerConfiguration,
  35. ws_conn: &Arc<FlowyWebSocketConnect>,
  36. text_block_manager: &Arc<TextBlockManager>,
  37. grid_manager: &Arc<GridManager>,
  38. ) -> Arc<FolderManager> {
  39. let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
  40. let database: Arc<dyn WorkspaceDatabase> = Arc::new(WorkspaceDatabaseImpl(user_session));
  41. let web_socket = Arc::new(FolderWebSocket(ws_conn.clone()));
  42. let cloud_service: Arc<dyn FolderCouldServiceV1> = match local_server {
  43. None => Arc::new(FolderHttpCloudService::new(server_config.clone())),
  44. Some(local_server) => local_server,
  45. };
  46. let view_data_processor = make_view_data_processor(text_block_manager.clone(), grid_manager.clone());
  47. let folder_manager =
  48. Arc::new(FolderManager::new(user.clone(), cloud_service, database, view_data_processor, web_socket).await);
  49. if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
  50. match folder_manager.initialize(&user_id, &token).await {
  51. Ok(_) => {}
  52. Err(e) => tracing::error!("Initialize folder manager failed: {}", e),
  53. }
  54. }
  55. let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone()));
  56. ws_conn.add_ws_message_receiver(receiver).unwrap();
  57. folder_manager
  58. }
  59. }
  60. fn make_view_data_processor(
  61. text_block_manager: Arc<TextBlockManager>,
  62. grid_manager: Arc<GridManager>,
  63. ) -> ViewDataProcessorMap {
  64. let mut map: HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>> = HashMap::new();
  65. let block_data_impl = TextBlockViewDataProcessor(text_block_manager);
  66. map.insert(block_data_impl.data_type(), Arc::new(block_data_impl));
  67. let grid_data_impl = GridViewDataProcessor(grid_manager);
  68. map.insert(grid_data_impl.data_type(), Arc::new(grid_data_impl));
  69. Arc::new(map)
  70. }
  71. struct WorkspaceDatabaseImpl(Arc<UserSession>);
  72. impl WorkspaceDatabase for WorkspaceDatabaseImpl {
  73. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  74. self.0.db_pool().map_err(|e| FlowyError::internal().context(e))
  75. }
  76. }
  77. struct WorkspaceUserImpl(Arc<UserSession>);
  78. impl WorkspaceUser for WorkspaceUserImpl {
  79. fn user_id(&self) -> Result<String, FlowyError> {
  80. self.0.user_id().map_err(|e| FlowyError::internal().context(e))
  81. }
  82. fn token(&self) -> Result<String, FlowyError> {
  83. self.0.token().map_err(|e| FlowyError::internal().context(e))
  84. }
  85. }
  86. struct FolderWebSocket(Arc<FlowyWebSocketConnect>);
  87. impl RevisionWebSocket for FolderWebSocket {
  88. fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
  89. let bytes: Bytes = data.try_into().unwrap();
  90. let msg = WebSocketRawMessage {
  91. channel: WSChannel::Folder,
  92. data: bytes.to_vec(),
  93. };
  94. let ws_conn = self.0.clone();
  95. Box::pin(async move {
  96. match ws_conn.web_socket().await? {
  97. None => {}
  98. Some(sender) => {
  99. sender.send(msg).map_err(internal_error)?;
  100. }
  101. }
  102. Ok(())
  103. })
  104. }
  105. fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
  106. let ws_conn = self.0.clone();
  107. Box::pin(async move { ws_conn.subscribe_websocket_state().await })
  108. }
  109. }
  110. struct FolderWSMessageReceiverImpl(Arc<FolderManager>);
  111. impl WSMessageReceiver for FolderWSMessageReceiverImpl {
  112. fn source(&self) -> WSChannel {
  113. WSChannel::Folder
  114. }
  115. fn receive_message(&self, msg: WebSocketRawMessage) {
  116. let handler = self.0.clone();
  117. tokio::spawn(async move {
  118. handler.did_receive_ws_data(Bytes::from(msg.data)).await;
  119. });
  120. }
  121. }
  122. struct TextBlockViewDataProcessor(Arc<TextBlockManager>);
  123. impl ViewDataProcessor for TextBlockViewDataProcessor {
  124. fn initialize(&self) -> FutureResult<(), FlowyError> {
  125. let manager = self.0.clone();
  126. FutureResult::new(async move { manager.init() })
  127. }
  128. fn create_container(&self, user_id: &str, view_id: &str, delta_data: Bytes) -> FutureResult<(), FlowyError> {
  129. let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, delta_data).into();
  130. let view_id = view_id.to_string();
  131. let manager = self.0.clone();
  132. FutureResult::new(async move {
  133. let _ = manager.create_block(view_id, repeated_revision).await?;
  134. Ok(())
  135. })
  136. }
  137. fn delete_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
  138. let manager = self.0.clone();
  139. let view_id = view_id.to_string();
  140. FutureResult::new(async move {
  141. let _ = manager.delete_block(view_id)?;
  142. Ok(())
  143. })
  144. }
  145. fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
  146. let manager = self.0.clone();
  147. let view_id = view_id.to_string();
  148. FutureResult::new(async move {
  149. let _ = manager.close_block(view_id)?;
  150. Ok(())
  151. })
  152. }
  153. fn get_delta_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> {
  154. let view_id = view_id.to_string();
  155. let manager = self.0.clone();
  156. FutureResult::new(async move {
  157. let editor = manager.open_block(view_id).await?;
  158. let delta_bytes = Bytes::from(editor.delta_str().await?);
  159. Ok(delta_bytes)
  160. })
  161. }
  162. fn create_default_view(
  163. &self,
  164. user_id: &str,
  165. view_id: &str,
  166. layout: ViewLayoutTypePB,
  167. ) -> FutureResult<Bytes, FlowyError> {
  168. let user_id = user_id.to_string();
  169. let view_id = view_id.to_string();
  170. let manager = self.0.clone();
  171. FutureResult::new(async move {
  172. let view_data = initial_quill_delta_string();
  173. let delta_data = Bytes::from(view_data);
  174. let repeated_revision: RepeatedRevision =
  175. Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into();
  176. let _ = manager.create_block(view_id, repeated_revision).await?;
  177. Ok(delta_data)
  178. })
  179. }
  180. fn create_view_from_delta_data(
  181. &self,
  182. _user_id: &str,
  183. _view_id: &str,
  184. data: Vec<u8>,
  185. ) -> FutureResult<Bytes, FlowyError> {
  186. FutureResult::new(async move { Ok(Bytes::from(data)) })
  187. }
  188. fn data_type(&self) -> ViewDataTypePB {
  189. ViewDataTypePB::Text
  190. }
  191. }
  192. struct GridViewDataProcessor(Arc<GridManager>);
  193. impl ViewDataProcessor for GridViewDataProcessor {
  194. fn initialize(&self) -> FutureResult<(), FlowyError> {
  195. FutureResult::new(async { Ok(()) })
  196. }
  197. fn create_container(&self, user_id: &str, view_id: &str, delta_data: Bytes) -> FutureResult<(), FlowyError> {
  198. let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, delta_data).into();
  199. let view_id = view_id.to_string();
  200. let grid_manager = self.0.clone();
  201. FutureResult::new(async move {
  202. let _ = grid_manager.create_grid(view_id, repeated_revision).await?;
  203. Ok(())
  204. })
  205. }
  206. fn delete_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
  207. let grid_manager = self.0.clone();
  208. let view_id = view_id.to_string();
  209. FutureResult::new(async move {
  210. let _ = grid_manager.delete_grid(view_id).await?;
  211. Ok(())
  212. })
  213. }
  214. fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
  215. let grid_manager = self.0.clone();
  216. let view_id = view_id.to_string();
  217. FutureResult::new(async move {
  218. let _ = grid_manager.close_grid(view_id).await?;
  219. Ok(())
  220. })
  221. }
  222. fn get_delta_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> {
  223. let view_id = view_id.to_string();
  224. let grid_manager = self.0.clone();
  225. FutureResult::new(async move {
  226. let editor = grid_manager.open_grid(view_id).await?;
  227. let delta_bytes = editor.duplicate_grid().await?;
  228. Ok(delta_bytes.into())
  229. })
  230. }
  231. fn create_default_view(
  232. &self,
  233. user_id: &str,
  234. view_id: &str,
  235. layout: ViewLayoutTypePB,
  236. ) -> FutureResult<Bytes, FlowyError> {
  237. let build_context = match layout {
  238. ViewLayoutTypePB::Grid => make_default_grid(),
  239. ViewLayoutTypePB::Board => make_default_board(),
  240. ViewLayoutTypePB::Document => {
  241. return FutureResult::new(async move {
  242. Err(FlowyError::internal().context(format!("Can't handle {:?} layout type", layout)))
  243. });
  244. }
  245. };
  246. let user_id = user_id.to_string();
  247. let view_id = view_id.to_string();
  248. let grid_manager = self.0.clone();
  249. FutureResult::new(async move { make_grid_view_data(&user_id, &view_id, grid_manager, build_context).await })
  250. }
  251. fn create_view_from_delta_data(
  252. &self,
  253. user_id: &str,
  254. view_id: &str,
  255. data: Vec<u8>,
  256. ) -> FutureResult<Bytes, FlowyError> {
  257. let user_id = user_id.to_string();
  258. let view_id = view_id.to_string();
  259. let grid_manager = self.0.clone();
  260. FutureResult::new(async move {
  261. let bytes = Bytes::from(data);
  262. let build_context = BuildGridContext::try_from(bytes)?;
  263. make_grid_view_data(&user_id, &view_id, grid_manager, build_context).await
  264. })
  265. }
  266. fn data_type(&self) -> ViewDataTypePB {
  267. ViewDataTypePB::Database
  268. }
  269. }