grid_deps.rs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. use crate::FlowyError;
  2. use bytes::Bytes;
  3. use flowy_database::ConnectionPool;
  4. use flowy_grid::manager::{GridManager, GridUser};
  5. use flowy_grid::services::persistence::GridDatabase;
  6. use flowy_http_model::ws_data::ClientRevisionWSData;
  7. use flowy_net::ws::connection::FlowyWebSocketConnect;
  8. use flowy_revision::{RevisionWebSocket, WSStateReceiver};
  9. use flowy_task::TaskDispatcher;
  10. use flowy_user::services::UserSession;
  11. use futures_core::future::BoxFuture;
  12. use lib_infra::future::BoxResultFuture;
  13. use lib_ws::{WSChannel, WebSocketRawMessage};
  14. use std::convert::TryInto;
  15. use std::sync::Arc;
  16. use tokio::sync::RwLock;
  17. pub struct GridDepsResolver();
  18. impl GridDepsResolver {
  19. pub async fn resolve(
  20. ws_conn: Arc<FlowyWebSocketConnect>,
  21. user_session: Arc<UserSession>,
  22. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  23. ) -> Arc<GridManager> {
  24. let user = Arc::new(GridUserImpl(user_session.clone()));
  25. let rev_web_socket = Arc::new(GridRevisionWebSocket(ws_conn));
  26. let grid_manager = Arc::new(GridManager::new(
  27. user.clone(),
  28. rev_web_socket,
  29. task_scheduler,
  30. Arc::new(GridDatabaseImpl(user_session)),
  31. ));
  32. if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
  33. match grid_manager.initialize(&user_id, &token).await {
  34. Ok(_) => {}
  35. Err(e) => tracing::error!("Initialize grid manager failed: {}", e),
  36. }
  37. }
  38. grid_manager
  39. }
  40. }
  41. struct GridDatabaseImpl(Arc<UserSession>);
  42. impl GridDatabase for GridDatabaseImpl {
  43. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  44. self.0.db_pool().map_err(|e| FlowyError::internal().context(e))
  45. }
  46. }
  47. struct GridUserImpl(Arc<UserSession>);
  48. impl GridUser for GridUserImpl {
  49. fn user_id(&self) -> Result<String, FlowyError> {
  50. self.0.user_id()
  51. }
  52. fn token(&self) -> Result<String, FlowyError> {
  53. self.0.token()
  54. }
  55. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  56. self.0.db_pool()
  57. }
  58. }
  59. struct GridRevisionWebSocket(Arc<FlowyWebSocketConnect>);
  60. impl RevisionWebSocket for GridRevisionWebSocket {
  61. fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
  62. let bytes: Bytes = data.try_into().unwrap();
  63. let msg = WebSocketRawMessage {
  64. channel: WSChannel::Grid,
  65. data: bytes.to_vec(),
  66. };
  67. let ws_conn = self.0.clone();
  68. Box::pin(async move {
  69. match ws_conn.web_socket().await? {
  70. None => {}
  71. Some(sender) => {
  72. sender.send(msg).map_err(|e| FlowyError::internal().context(e))?;
  73. }
  74. }
  75. Ok(())
  76. })
  77. }
  78. fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
  79. let ws_conn = self.0.clone();
  80. Box::pin(async move { ws_conn.subscribe_websocket_state().await })
  81. }
  82. }