document_deps.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. use backend_service::configuration::ClientServerConfiguration;
  2. use bytes::Bytes;
  3. use flowy_collaboration::entities::{
  4. doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
  5. ws::DocumentClientWSData,
  6. };
  7. use flowy_database::ConnectionPool;
  8. use flowy_document::{
  9. context::DocumentUser,
  10. core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
  11. errors::{internal_error, FlowyError},
  12. DocumentCloudService,
  13. };
  14. use flowy_net::{
  15. cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService},
  16. ws::connection::FlowyWebSocketConnect,
  17. };
  18. use flowy_user::services::UserSession;
  19. use lib_infra::future::FutureResult;
  20. use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
  21. use std::{convert::TryInto, path::Path, sync::Arc};
  22. pub struct DocumentDependencies {
  23. pub user: Arc<dyn DocumentUser>,
  24. pub ws_receivers: Arc<DocumentWSReceivers>,
  25. pub ws_sender: Arc<dyn DocumentWebSocket>,
  26. pub cloud_service: Arc<dyn DocumentCloudService>,
  27. }
  28. pub struct DocumentDepsResolver();
  29. impl DocumentDepsResolver {
  30. pub fn resolve(
  31. ws_conn: Arc<FlowyWebSocketConnect>,
  32. user_session: Arc<UserSession>,
  33. server_config: &ClientServerConfiguration,
  34. ) -> DocumentDependencies {
  35. let user = Arc::new(DocumentUserImpl(user_session));
  36. let ws_sender = Arc::new(DocumentWebSocketImpl(ws_conn.clone()));
  37. let ws_receivers = Arc::new(DocumentWSReceivers::new());
  38. let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone()));
  39. ws_conn.add_ws_message_receiver(receiver).unwrap();
  40. let cloud_service = make_document_cloud_service(server_config);
  41. DocumentDependencies {
  42. user,
  43. ws_receivers,
  44. ws_sender,
  45. cloud_service,
  46. }
  47. }
  48. }
  49. struct DocumentUserImpl(Arc<UserSession>);
  50. impl DocumentUser for DocumentUserImpl {
  51. fn user_dir(&self) -> Result<String, FlowyError> {
  52. let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?;
  53. let doc_dir = format!("{}/document", dir);
  54. if !Path::new(&doc_dir).exists() {
  55. let _ = std::fs::create_dir_all(&doc_dir)?;
  56. }
  57. Ok(doc_dir)
  58. }
  59. fn user_id(&self) -> Result<String, FlowyError> { self.0.user_id() }
  60. fn token(&self) -> Result<String, FlowyError> { self.0.token() }
  61. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.0.db_pool() }
  62. }
  63. struct DocumentWebSocketImpl(Arc<FlowyWebSocketConnect>);
  64. impl DocumentWebSocket for DocumentWebSocketImpl {
  65. fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> {
  66. let bytes: Bytes = data.try_into().unwrap();
  67. let msg = WebSocketRawMessage {
  68. module: WSModule::Doc,
  69. data: bytes.to_vec(),
  70. };
  71. let sender = self.0.ws_sender()?;
  72. sender.send(msg).map_err(internal_error)?;
  73. Ok(())
  74. }
  75. fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
  76. }
  77. struct WSMessageReceiverImpl(Arc<DocumentWSReceivers>);
  78. impl WSMessageReceiver for WSMessageReceiverImpl {
  79. fn source(&self) -> WSModule { WSModule::Doc }
  80. fn receive_message(&self, msg: WebSocketRawMessage) {
  81. let receivers = self.0.clone();
  82. tokio::spawn(async move {
  83. receivers.did_receive_data(Bytes::from(msg.data)).await;
  84. });
  85. }
  86. }
  87. fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc<dyn DocumentCloudService> {
  88. if cfg!(feature = "http_server") {
  89. Arc::new(DocumentHttpCloudServiceAdaptor::new(server_config.clone()))
  90. } else {
  91. Arc::new(DocumentLocalCloudServiceAdaptor::new())
  92. }
  93. }
  94. struct DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService);
  95. impl DocumentHttpCloudServiceAdaptor {
  96. fn new(config: ClientServerConfiguration) -> Self {
  97. DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService::new(config))
  98. }
  99. }
  100. impl DocumentCloudService for DocumentHttpCloudServiceAdaptor {
  101. fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
  102. self.0.create_document_request(token, params)
  103. }
  104. fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
  105. self.0.read_document_request(token, params)
  106. }
  107. fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
  108. self.0.update_document_request(token, params)
  109. }
  110. }
  111. struct DocumentLocalCloudServiceAdaptor(DocumentLocalCloudService);
  112. impl DocumentLocalCloudServiceAdaptor {
  113. fn new() -> Self { Self(DocumentLocalCloudService {}) }
  114. }
  115. impl DocumentCloudService for DocumentLocalCloudServiceAdaptor {
  116. fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
  117. self.0.create_document_request(token, params)
  118. }
  119. fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
  120. self.0.read_document_request(token, params)
  121. }
  122. fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
  123. self.0.update_document_request(token, params)
  124. }
  125. }