grid_deps.rs 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use crate::FlowyError;
  2. use bytes::Bytes;
  3. use flowy_database::ConnectionPool;
  4. use flowy_grid::manager::{GridManager, GridUser};
  5. use flowy_grid::services::persistence::GridDatabase;
  6. use flowy_net::ws::connection::FlowyWebSocketConnect;
  7. use flowy_revision::{RevisionWebSocket, WSStateReceiver};
  8. use flowy_sync::entities::ws_data::ClientRevisionWSData;
  9. use flowy_user::services::UserSession;
  10. use futures_core::future::BoxFuture;
  11. use lib_infra::future::BoxResultFuture;
  12. use lib_ws::{WSChannel, WebSocketRawMessage};
  13. use std::convert::TryInto;
  14. use std::sync::Arc;
  15. pub struct GridDepsResolver();
  16. impl GridDepsResolver {
  17. pub async fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
  18. let user = Arc::new(GridUserImpl(user_session.clone()));
  19. let rev_web_socket = Arc::new(GridWebSocket(ws_conn));
  20. let grid_manager = Arc::new(GridManager::new(
  21. user.clone(),
  22. rev_web_socket,
  23. Arc::new(GridDatabaseImpl(user_session)),
  24. ));
  25. if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
  26. match grid_manager.initialize(&user_id, &token).await {
  27. Ok(_) => {}
  28. Err(e) => tracing::error!("Initialize grid manager failed: {}", e),
  29. }
  30. }
  31. grid_manager
  32. }
  33. }
  34. struct GridDatabaseImpl(Arc<UserSession>);
  35. impl GridDatabase for GridDatabaseImpl {
  36. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  37. self.0.db_pool().map_err(|e| FlowyError::internal().context(e))
  38. }
  39. }
  40. struct GridUserImpl(Arc<UserSession>);
  41. impl GridUser for GridUserImpl {
  42. fn user_id(&self) -> Result<String, FlowyError> {
  43. self.0.user_id()
  44. }
  45. fn token(&self) -> Result<String, FlowyError> {
  46. self.0.token()
  47. }
  48. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  49. self.0.db_pool()
  50. }
  51. }
  52. struct GridWebSocket(Arc<FlowyWebSocketConnect>);
  53. impl RevisionWebSocket for GridWebSocket {
  54. fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
  55. let bytes: Bytes = data.try_into().unwrap();
  56. let msg = WebSocketRawMessage {
  57. channel: WSChannel::Grid,
  58. data: bytes.to_vec(),
  59. };
  60. let ws_conn = self.0.clone();
  61. Box::pin(async move {
  62. match ws_conn.web_socket().await? {
  63. None => {}
  64. Some(sender) => {
  65. sender.send(msg).map_err(|e| FlowyError::internal().context(e))?;
  66. }
  67. }
  68. Ok(())
  69. })
  70. }
  71. fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
  72. let ws_conn = self.0.clone();
  73. Box::pin(async move { ws_conn.subscribe_websocket_state().await })
  74. }
  75. }