server.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. use bytes::Bytes;
  2. use dashmap::DashMap;
  3. use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*};
  4. // use flowy_net::services::ws::*;
  5. use lib_infra::future::BoxResultFuture;
  6. use lib_ws::{WSModule, WebSocketRawMessage};
  7. use std::{
  8. convert::TryInto,
  9. fmt::{Debug, Formatter},
  10. sync::Arc,
  11. };
  12. use tokio::sync::mpsc;
  13. pub struct MockDocServer {
  14. pub manager: Arc<ServerDocumentManager>,
  15. }
  16. impl std::default::Default for MockDocServer {
  17. fn default() -> Self {
  18. let persistence = Arc::new(MockDocServerPersistence::default());
  19. let manager = Arc::new(ServerDocumentManager::new(persistence));
  20. MockDocServer { manager }
  21. }
  22. }
  23. impl MockDocServer {
  24. pub async fn handle_client_data(
  25. &self,
  26. client_data: DocumentClientWSData,
  27. ) -> Option<mpsc::Receiver<WebSocketRawMessage>> {
  28. match client_data.ty {
  29. DocumentClientWSDataType::ClientPushRev => {
  30. let (tx, rx) = mpsc::channel(1);
  31. let user = Arc::new(MockDocUser {
  32. user_id: "fake_user_id".to_owned(),
  33. tx,
  34. });
  35. let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData =
  36. client_data.try_into().unwrap();
  37. self.manager.apply_revisions(user, pb_client_data).await.unwrap();
  38. Some(rx)
  39. },
  40. }
  41. }
  42. }
  43. struct MockDocServerPersistence {
  44. inner: Arc<DashMap<String, DocumentInfo>>,
  45. }
  46. impl Debug for MockDocServerPersistence {
  47. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
  48. }
  49. impl std::default::Default for MockDocServerPersistence {
  50. fn default() -> Self {
  51. MockDocServerPersistence {
  52. inner: Arc::new(DashMap::new()),
  53. }
  54. }
  55. }
  56. impl DocumentPersistence for MockDocServerPersistence {
  57. fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  58. let inner = self.inner.clone();
  59. let doc_id = doc_id.to_owned();
  60. Box::pin(async move {
  61. match inner.get(&doc_id) {
  62. None => {
  63. //
  64. Err(CollaborateError::record_not_found())
  65. },
  66. Some(val) => {
  67. //
  68. Ok(val.value().clone())
  69. },
  70. }
  71. })
  72. }
  73. fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  74. let doc_id = doc_id.to_owned();
  75. Box::pin(async move { DocumentInfo::from_revisions(&doc_id, revisions) })
  76. }
  77. fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
  78. Box::pin(async move { Ok(vec![]) })
  79. }
  80. fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> { unimplemented!() }
  81. }
  82. #[derive(Debug)]
  83. struct MockDocUser {
  84. user_id: String,
  85. tx: mpsc::Sender<WebSocketRawMessage>,
  86. }
  87. impl RevisionUser for MockDocUser {
  88. fn user_id(&self) -> String { self.user_id.clone() }
  89. fn receive(&self, resp: SyncResponse) {
  90. let sender = self.tx.clone();
  91. tokio::spawn(async move {
  92. match resp {
  93. SyncResponse::Pull(data) => {
  94. let bytes: Bytes = data.try_into().unwrap();
  95. let msg = WebSocketRawMessage {
  96. module: WSModule::Doc,
  97. data: bytes.to_vec(),
  98. };
  99. sender.send(msg).await.unwrap();
  100. },
  101. SyncResponse::Push(data) => {
  102. let bytes: Bytes = data.try_into().unwrap();
  103. let msg = WebSocketRawMessage {
  104. module: WSModule::Doc,
  105. data: bytes.to_vec(),
  106. };
  107. sender.send(msg).await.unwrap();
  108. },
  109. SyncResponse::Ack(data) => {
  110. let bytes: Bytes = data.try_into().unwrap();
  111. let msg = WebSocketRawMessage {
  112. module: WSModule::Doc,
  113. data: bytes.to_vec(),
  114. };
  115. sender.send(msg).await.unwrap();
  116. },
  117. SyncResponse::NewRevision(_) => {
  118. // unimplemented!()
  119. },
  120. }
  121. });
  122. }
  123. }