server.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. use crate::local_server::persistence::LocalDocumentCloudPersistence;
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_collaboration::{
  5. client_document::default::initial_delta_string,
  6. entities::{
  7. document_info::{BlockId, BlockInfo, CreateBlockParams, ResetDocumentParams},
  8. ws_data::{ClientRevisionWSData, ClientRevisionWSDataType},
  9. },
  10. errors::CollaborateError,
  11. protobuf::ClientRevisionWSData as ClientRevisionWSDataPB,
  12. server_document::ServerDocumentManager,
  13. server_folder::ServerFolderManager,
  14. synchronizer::{RevisionSyncResponse, RevisionUser},
  15. };
  16. use flowy_error::{internal_error, FlowyError};
  17. use flowy_folder::event_map::FolderCouldServiceV1;
  18. use futures_util::stream::StreamExt;
  19. use lib_ws::{WSChannel, WebSocketRawMessage};
  20. use parking_lot::RwLock;
  21. use std::{
  22. convert::{TryFrom, TryInto},
  23. fmt::Debug,
  24. sync::Arc,
  25. };
  26. use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender};
  27. pub struct LocalServer {
  28. doc_manager: Arc<ServerDocumentManager>,
  29. folder_manager: Arc<ServerFolderManager>,
  30. stop_tx: RwLock<Option<mpsc::Sender<()>>>,
  31. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  32. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  33. }
  34. impl LocalServer {
  35. pub fn new(
  36. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  37. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  38. ) -> Self {
  39. let persistence = Arc::new(LocalDocumentCloudPersistence::default());
  40. let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
  41. let folder_manager = Arc::new(ServerFolderManager::new(persistence));
  42. let stop_tx = RwLock::new(None);
  43. LocalServer {
  44. doc_manager,
  45. folder_manager,
  46. stop_tx,
  47. client_ws_sender,
  48. client_ws_receiver,
  49. }
  50. }
  51. pub async fn stop(&self) {
  52. if let Some(stop_tx) = self.stop_tx.read().clone() {
  53. let _ = stop_tx.send(()).await;
  54. }
  55. }
  56. pub fn run(&self) {
  57. let (stop_tx, stop_rx) = mpsc::channel(1);
  58. *self.stop_tx.write() = Some(stop_tx);
  59. let runner = LocalWebSocketRunner {
  60. doc_manager: self.doc_manager.clone(),
  61. folder_manager: self.folder_manager.clone(),
  62. stop_rx: Some(stop_rx),
  63. client_ws_sender: self.client_ws_sender.clone(),
  64. client_ws_receiver: Some(self.client_ws_receiver.subscribe()),
  65. };
  66. tokio::spawn(runner.run());
  67. }
  68. }
  69. struct LocalWebSocketRunner {
  70. doc_manager: Arc<ServerDocumentManager>,
  71. folder_manager: Arc<ServerFolderManager>,
  72. stop_rx: Option<mpsc::Receiver<()>>,
  73. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  74. client_ws_receiver: Option<broadcast::Receiver<WebSocketRawMessage>>,
  75. }
  76. impl LocalWebSocketRunner {
  77. pub async fn run(mut self) {
  78. let mut stop_rx = self.stop_rx.take().expect("Only run once");
  79. let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once");
  80. let stream = stream! {
  81. loop {
  82. tokio::select! {
  83. result = client_ws_receiver.recv() => {
  84. match result {
  85. Ok(msg) => yield msg,
  86. Err(_e) => {},
  87. }
  88. },
  89. _ = stop_rx.recv() => {
  90. tracing::trace!("[LocalWebSocketRunner] stop");
  91. break
  92. },
  93. };
  94. }
  95. };
  96. stream
  97. .for_each(|message| async {
  98. match self.handle_message(message).await {
  99. Ok(_) => {}
  100. Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e),
  101. }
  102. })
  103. .await;
  104. }
  105. async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
  106. let bytes = Bytes::from(message.data);
  107. let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
  108. match message.channel {
  109. WSChannel::Document => {
  110. let _ = self.handle_document_client_data(client_data, "".to_owned()).await?;
  111. Ok(())
  112. }
  113. WSChannel::Folder => {
  114. let _ = self.handle_folder_client_data(client_data, "".to_owned()).await?;
  115. Ok(())
  116. }
  117. }
  118. }
  119. pub async fn handle_folder_client_data(
  120. &self,
  121. client_data: ClientRevisionWSData,
  122. user_id: String,
  123. ) -> Result<(), CollaborateError> {
  124. tracing::trace!(
  125. "[LocalFolderServer] receive: {}:{}-{:?} ",
  126. client_data.object_id,
  127. client_data.id(),
  128. client_data.ty,
  129. );
  130. let client_ws_sender = self.client_ws_sender.clone();
  131. let user = Arc::new(LocalRevisionUser {
  132. user_id,
  133. client_ws_sender,
  134. channel: WSChannel::Folder,
  135. });
  136. let ty = client_data.ty.clone();
  137. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  138. match ty {
  139. ClientRevisionWSDataType::ClientPushRev => {
  140. let _ = self
  141. .folder_manager
  142. .handle_client_revisions(user, document_client_data)
  143. .await?;
  144. }
  145. ClientRevisionWSDataType::ClientPing => {
  146. let _ = self
  147. .folder_manager
  148. .handle_client_ping(user, document_client_data)
  149. .await?;
  150. }
  151. }
  152. Ok(())
  153. }
  154. pub async fn handle_document_client_data(
  155. &self,
  156. client_data: ClientRevisionWSData,
  157. user_id: String,
  158. ) -> Result<(), CollaborateError> {
  159. tracing::trace!(
  160. "[LocalDocumentServer] receive: {}:{}-{:?} ",
  161. client_data.object_id,
  162. client_data.id(),
  163. client_data.ty,
  164. );
  165. let client_ws_sender = self.client_ws_sender.clone();
  166. let user = Arc::new(LocalRevisionUser {
  167. user_id,
  168. client_ws_sender,
  169. channel: WSChannel::Document,
  170. });
  171. let ty = client_data.ty.clone();
  172. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  173. match ty {
  174. ClientRevisionWSDataType::ClientPushRev => {
  175. let _ = self
  176. .doc_manager
  177. .handle_client_revisions(user, document_client_data)
  178. .await?;
  179. }
  180. ClientRevisionWSDataType::ClientPing => {
  181. let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
  182. }
  183. }
  184. Ok(())
  185. }
  186. }
  187. #[derive(Debug)]
  188. struct LocalRevisionUser {
  189. user_id: String,
  190. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  191. channel: WSChannel,
  192. }
  193. impl RevisionUser for LocalRevisionUser {
  194. fn user_id(&self) -> String {
  195. self.user_id.clone()
  196. }
  197. fn receive(&self, resp: RevisionSyncResponse) {
  198. let sender = self.client_ws_sender.clone();
  199. let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
  200. Ok(_) => {}
  201. Err(e) => {
  202. tracing::error!("LocalDocumentUser send message failed: {}", e);
  203. }
  204. };
  205. let channel = self.channel.clone();
  206. tokio::spawn(async move {
  207. match resp {
  208. RevisionSyncResponse::Pull(data) => {
  209. let bytes: Bytes = data.try_into().unwrap();
  210. let msg = WebSocketRawMessage {
  211. channel,
  212. data: bytes.to_vec(),
  213. };
  214. send_fn(sender, msg);
  215. }
  216. RevisionSyncResponse::Push(data) => {
  217. let bytes: Bytes = data.try_into().unwrap();
  218. let msg = WebSocketRawMessage {
  219. channel,
  220. data: bytes.to_vec(),
  221. };
  222. send_fn(sender, msg);
  223. }
  224. RevisionSyncResponse::Ack(data) => {
  225. let bytes: Bytes = data.try_into().unwrap();
  226. let msg = WebSocketRawMessage {
  227. channel,
  228. data: bytes.to_vec(),
  229. };
  230. send_fn(sender, msg);
  231. }
  232. }
  233. });
  234. }
  235. }
  236. use flowy_document::BlockCloudService;
  237. use flowy_folder_data_model::entities::{
  238. app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams},
  239. trash::{RepeatedTrash, RepeatedTrashId},
  240. view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId},
  241. workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
  242. };
  243. use flowy_user::event_map::UserCloudService;
  244. use flowy_user_data_model::entities::{
  245. SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile,
  246. };
  247. use lib_infra::{future::FutureResult, timestamp, uuid_string};
  248. impl FolderCouldServiceV1 for LocalServer {
  249. fn init(&self) {}
  250. fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
  251. let time = timestamp();
  252. let workspace = Workspace {
  253. id: uuid_string(),
  254. name: params.name,
  255. desc: params.desc,
  256. apps: RepeatedApp::default(),
  257. modified_time: time,
  258. create_time: time,
  259. };
  260. FutureResult::new(async { Ok(workspace) })
  261. }
  262. fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
  263. FutureResult::new(async {
  264. let repeated_workspace = RepeatedWorkspace { items: vec![] };
  265. Ok(repeated_workspace)
  266. })
  267. }
  268. fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
  269. FutureResult::new(async { Ok(()) })
  270. }
  271. fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
  272. FutureResult::new(async { Ok(()) })
  273. }
  274. fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
  275. let time = timestamp();
  276. let view = View {
  277. id: params.view_id,
  278. belong_to_id: params.belong_to_id,
  279. name: params.name,
  280. desc: params.desc,
  281. data_type: params.data_type,
  282. version: 0,
  283. belongings: RepeatedView::default(),
  284. modified_time: time,
  285. create_time: time,
  286. ext_data: params.ext_data,
  287. thumbnail: params.thumbnail,
  288. };
  289. FutureResult::new(async { Ok(view) })
  290. }
  291. fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<View>, FlowyError> {
  292. FutureResult::new(async { Ok(None) })
  293. }
  294. fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
  295. FutureResult::new(async { Ok(()) })
  296. }
  297. fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
  298. FutureResult::new(async { Ok(()) })
  299. }
  300. fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
  301. let time = timestamp();
  302. let app = App {
  303. id: uuid_string(),
  304. workspace_id: params.workspace_id,
  305. name: params.name,
  306. desc: params.desc,
  307. belongings: RepeatedView::default(),
  308. version: 0,
  309. modified_time: time,
  310. create_time: time,
  311. };
  312. FutureResult::new(async { Ok(app) })
  313. }
  314. fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<App>, FlowyError> {
  315. FutureResult::new(async { Ok(None) })
  316. }
  317. fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
  318. FutureResult::new(async { Ok(()) })
  319. }
  320. fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
  321. FutureResult::new(async { Ok(()) })
  322. }
  323. fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
  324. FutureResult::new(async { Ok(()) })
  325. }
  326. fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
  327. FutureResult::new(async { Ok(()) })
  328. }
  329. fn read_trash(&self, _token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
  330. FutureResult::new(async {
  331. let repeated_trash = RepeatedTrash { items: vec![] };
  332. Ok(repeated_trash)
  333. })
  334. }
  335. }
  336. impl UserCloudService for LocalServer {
  337. fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
  338. let uid = uuid_string();
  339. FutureResult::new(async move {
  340. Ok(SignUpResponse {
  341. user_id: uid.clone(),
  342. name: params.name,
  343. email: params.email,
  344. token: uid,
  345. })
  346. })
  347. }
  348. fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
  349. let user_id = uuid_string();
  350. FutureResult::new(async {
  351. Ok(SignInResponse {
  352. user_id: user_id.clone(),
  353. name: params.name,
  354. email: params.email,
  355. token: user_id,
  356. })
  357. })
  358. }
  359. fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> {
  360. FutureResult::new(async { Ok(()) })
  361. }
  362. fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
  363. FutureResult::new(async { Ok(()) })
  364. }
  365. fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
  366. FutureResult::new(async { Ok(UserProfile::default()) })
  367. }
  368. fn ws_addr(&self) -> String {
  369. "ws://localhost:8000/ws/".to_owned()
  370. }
  371. }
  372. impl BlockCloudService for LocalServer {
  373. fn create_block(&self, _token: &str, _params: CreateBlockParams) -> FutureResult<(), FlowyError> {
  374. FutureResult::new(async { Ok(()) })
  375. }
  376. fn read_block(&self, _token: &str, params: BlockId) -> FutureResult<Option<BlockInfo>, FlowyError> {
  377. let doc = BlockInfo {
  378. doc_id: params.value,
  379. text: initial_delta_string(),
  380. rev_id: 0,
  381. base_rev_id: 0,
  382. };
  383. FutureResult::new(async { Ok(Some(doc)) })
  384. }
  385. fn update_block(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
  386. FutureResult::new(async { Ok(()) })
  387. }
  388. }