document_deps.rs 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. use bytes::Bytes;
  2. use flowy_collaboration::entities::ws::DocumentWSData;
  3. use flowy_database::ConnectionPool;
  4. use flowy_document::{
  5. context::DocumentUser,
  6. errors::{internal_error, FlowyError},
  7. services::doc::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
  8. };
  9. use flowy_net::services::ws::WsManager;
  10. use flowy_user::services::user::UserSession;
  11. use lib_ws::{WSMessage, WSMessageReceiver, WSModule};
  12. use std::{convert::TryInto, path::Path, sync::Arc};
  13. pub struct DocumentDepsResolver();
  14. impl DocumentDepsResolver {
  15. pub fn resolve(
  16. ws_manager: Arc<WsManager>,
  17. user_session: Arc<UserSession>,
  18. ) -> (
  19. Arc<dyn DocumentUser>,
  20. Arc<DocumentWSReceivers>,
  21. Arc<dyn DocumentWebSocket>,
  22. ) {
  23. let user = Arc::new(DocumentUserImpl { user: user_session });
  24. let ws_sender = Arc::new(DocumentWebSocketAdapter {
  25. ws_manager: ws_manager.clone(),
  26. });
  27. let ws_receivers = Arc::new(DocumentWSReceivers::new());
  28. let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone()));
  29. ws_manager.add_receiver(receiver).unwrap();
  30. (user, ws_receivers, ws_sender)
  31. }
  32. }
  33. struct DocumentUserImpl {
  34. user: Arc<UserSession>,
  35. }
  36. impl DocumentUserImpl {}
  37. impl DocumentUser for DocumentUserImpl {
  38. fn user_dir(&self) -> Result<String, FlowyError> {
  39. let dir = self
  40. .user
  41. .user_dir()
  42. .map_err(|e| FlowyError::unauthorized().context(e))?;
  43. let doc_dir = format!("{}/document", dir);
  44. if !Path::new(&doc_dir).exists() {
  45. let _ = std::fs::create_dir_all(&doc_dir)?;
  46. }
  47. Ok(doc_dir)
  48. }
  49. fn user_id(&self) -> Result<String, FlowyError> { self.user.user_id() }
  50. fn token(&self) -> Result<String, FlowyError> { self.user.token() }
  51. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.user.db_pool() }
  52. }
  53. struct DocumentWebSocketAdapter {
  54. ws_manager: Arc<WsManager>,
  55. }
  56. impl DocumentWebSocket for DocumentWebSocketAdapter {
  57. fn send(&self, data: DocumentWSData) -> Result<(), FlowyError> {
  58. let bytes: Bytes = data.try_into().unwrap();
  59. let msg = WSMessage {
  60. module: WSModule::Doc,
  61. data: bytes.to_vec(),
  62. };
  63. let sender = self.ws_manager.ws_sender().map_err(internal_error)?;
  64. sender.send(msg).map_err(internal_error)?;
  65. Ok(())
  66. }
  67. fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_manager.subscribe_websocket_state() }
  68. }
  69. struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>);
  70. impl WSMessageReceiver for WSMessageReceiverAdaptor {
  71. fn source(&self) -> WSModule { WSModule::Doc }
  72. fn receive_message(&self, msg: WSMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
  73. }