server.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. use bytes::Bytes;
  2. use dashmap::DashMap;
  3. use flowy_collaboration::{
  4. entities::{
  5. doc::DocumentInfo,
  6. ws::{DocumentClientWSData, DocumentClientWSDataType},
  7. },
  8. errors::CollaborateError,
  9. protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
  10. sync::*,
  11. util::repeated_revision_from_repeated_revision_pb,
  12. };
  13. use lib_infra::future::BoxResultFuture;
  14. use lib_ws::{WSModule, WebSocketRawMessage};
  15. use std::{
  16. convert::TryInto,
  17. fmt::{Debug, Formatter},
  18. sync::Arc,
  19. };
  20. use tokio::sync::mpsc;
  21. pub struct MockDocServer {
  22. pub manager: Arc<ServerDocumentManager>,
  23. }
  24. impl std::default::Default for MockDocServer {
  25. fn default() -> Self {
  26. let persistence = Arc::new(MockDocServerPersistence::default());
  27. let manager = Arc::new(ServerDocumentManager::new(persistence));
  28. MockDocServer { manager }
  29. }
  30. }
  31. impl MockDocServer {
  32. pub async fn handle_client_data(
  33. &self,
  34. client_data: DocumentClientWSData,
  35. ) -> Option<mpsc::Receiver<WebSocketRawMessage>> {
  36. match client_data.ty {
  37. DocumentClientWSDataType::ClientPushRev => {
  38. let (tx, rx) = mpsc::channel(1);
  39. let user = Arc::new(MockDocUser {
  40. user_id: "fake_user_id".to_owned(),
  41. tx,
  42. });
  43. let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData =
  44. client_data.try_into().unwrap();
  45. self.manager
  46. .handle_client_revisions(user, pb_client_data)
  47. .await
  48. .unwrap();
  49. Some(rx)
  50. },
  51. DocumentClientWSDataType::ClientPing => {
  52. todo!()
  53. },
  54. }
  55. }
  56. }
  57. struct MockDocServerPersistence {
  58. inner: Arc<DashMap<String, DocumentInfo>>,
  59. }
  60. impl Debug for MockDocServerPersistence {
  61. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
  62. }
  63. impl std::default::Default for MockDocServerPersistence {
  64. fn default() -> Self {
  65. MockDocServerPersistence {
  66. inner: Arc::new(DashMap::new()),
  67. }
  68. }
  69. }
  70. impl DocumentPersistence for MockDocServerPersistence {
  71. fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  72. let inner = self.inner.clone();
  73. let doc_id = doc_id.to_owned();
  74. Box::pin(async move {
  75. match inner.get(&doc_id) {
  76. None => {
  77. //
  78. Err(CollaborateError::record_not_found())
  79. },
  80. Some(val) => {
  81. //
  82. Ok(val.value().clone())
  83. },
  84. }
  85. })
  86. }
  87. fn create_doc(
  88. &self,
  89. doc_id: &str,
  90. repeated_revision: RepeatedRevisionPB,
  91. ) -> BoxResultFuture<DocumentInfo, CollaborateError> {
  92. let doc_id = doc_id.to_owned();
  93. Box::pin(async move {
  94. let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
  95. DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())
  96. })
  97. }
  98. fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  99. Box::pin(async move { Ok(vec![]) })
  100. }
  101. fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
  102. unimplemented!()
  103. }
  104. fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
  105. unimplemented!()
  106. }
  107. }
  108. #[derive(Debug)]
  109. struct MockDocUser {
  110. user_id: String,
  111. tx: mpsc::Sender<WebSocketRawMessage>,
  112. }
  113. impl RevisionUser for MockDocUser {
  114. fn user_id(&self) -> String { self.user_id.clone() }
  115. fn receive(&self, resp: SyncResponse) {
  116. let sender = self.tx.clone();
  117. tokio::spawn(async move {
  118. match resp {
  119. SyncResponse::Pull(data) => {
  120. let bytes: Bytes = data.try_into().unwrap();
  121. let msg = WebSocketRawMessage {
  122. module: WSModule::Doc,
  123. data: bytes.to_vec(),
  124. };
  125. sender.send(msg).await.unwrap();
  126. },
  127. SyncResponse::Push(data) => {
  128. let bytes: Bytes = data.try_into().unwrap();
  129. let msg = WebSocketRawMessage {
  130. module: WSModule::Doc,
  131. data: bytes.to_vec(),
  132. };
  133. sender.send(msg).await.unwrap();
  134. },
  135. SyncResponse::Ack(data) => {
  136. let bytes: Bytes = data.try_into().unwrap();
  137. let msg = WebSocketRawMessage {
  138. module: WSModule::Doc,
  139. data: bytes.to_vec(),
  140. };
  141. sender.send(msg).await.unwrap();
  142. },
  143. SyncResponse::NewRevision(_) => {
  144. // unimplemented!()
  145. },
  146. }
  147. });
  148. }
  149. }