grid_deps.rs 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. use crate::FlowyError;
  2. use bytes::Bytes;
  3. use flowy_database::ConnectionPool;
  4. use flowy_grid::manager::{GridManager, GridUser};
  5. use flowy_grid::services::persistence::GridDatabase;
  6. use flowy_net::ws::connection::FlowyWebSocketConnect;
  7. use flowy_revision::{RevisionWebSocket, WSStateReceiver};
  8. use flowy_sync::entities::ws_data::ClientRevisionWSData;
  9. use flowy_user::services::UserSession;
  10. use futures_core::future::BoxFuture;
  11. use lib_infra::future::BoxResultFuture;
  12. use lib_ws::{WSChannel, WebSocketRawMessage};
  13. use std::convert::TryInto;
  14. use std::sync::Arc;
  15. pub struct GridDepsResolver();
  16. impl GridDepsResolver {
  17. pub fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
  18. let user = Arc::new(GridUserImpl(user_session.clone()));
  19. let rev_web_socket = Arc::new(GridWebSocket(ws_conn));
  20. Arc::new(GridManager::new(
  21. user,
  22. rev_web_socket,
  23. Arc::new(GridDatabaseImpl(user_session)),
  24. ))
  25. }
  26. }
  27. struct GridDatabaseImpl(Arc<UserSession>);
  28. impl GridDatabase for GridDatabaseImpl {
  29. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  30. self.0.db_pool().map_err(|e| FlowyError::internal().context(e))
  31. }
  32. }
  33. struct GridUserImpl(Arc<UserSession>);
  34. impl GridUser for GridUserImpl {
  35. fn user_id(&self) -> Result<String, FlowyError> {
  36. self.0.user_id()
  37. }
  38. fn token(&self) -> Result<String, FlowyError> {
  39. self.0.token()
  40. }
  41. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
  42. self.0.db_pool()
  43. }
  44. }
  45. struct GridWebSocket(Arc<FlowyWebSocketConnect>);
  46. impl RevisionWebSocket for GridWebSocket {
  47. fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
  48. let bytes: Bytes = data.try_into().unwrap();
  49. let msg = WebSocketRawMessage {
  50. channel: WSChannel::Grid,
  51. data: bytes.to_vec(),
  52. };
  53. let ws_conn = self.0.clone();
  54. Box::pin(async move {
  55. match ws_conn.web_socket().await? {
  56. None => {}
  57. Some(sender) => {
  58. sender.send(msg).map_err(|e| FlowyError::internal().context(e))?;
  59. }
  60. }
  61. Ok(())
  62. })
  63. }
  64. fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
  65. let ws_conn = self.0.clone();
  66. Box::pin(async move { ws_conn.subscribe_websocket_state().await })
  67. }
  68. }