server.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. use crate::local_server::persistence::LocalDocumentCloudPersistence;
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_error::{internal_error, FlowyError};
  5. use flowy_folder::event_map::FolderCouldServiceV1;
  6. use flowy_sync::{
  7. entities::{
  8. document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams},
  9. ws_data::{ClientRevisionWSData, ClientRevisionWSDataType},
  10. },
  11. errors::CollaborateError,
  12. protobuf::ClientRevisionWSData as ClientRevisionWSDataPB,
  13. server_document::ServerDocumentManager,
  14. server_folder::ServerFolderManager,
  15. synchronizer::{RevisionSyncResponse, RevisionUser},
  16. };
  17. use futures_util::stream::StreamExt;
  18. use lib_ws::{WSChannel, WebSocketRawMessage};
  19. use nanoid::nanoid;
  20. use parking_lot::RwLock;
  21. use std::{
  22. convert::{TryFrom, TryInto},
  23. fmt::Debug,
  24. sync::Arc,
  25. };
  26. use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender};
  27. pub struct LocalServer {
  28. doc_manager: Arc<ServerDocumentManager>,
  29. folder_manager: Arc<ServerFolderManager>,
  30. stop_tx: RwLock<Option<mpsc::Sender<()>>>,
  31. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  32. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  33. }
  34. impl LocalServer {
  35. pub fn new(
  36. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  37. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  38. ) -> Self {
  39. let persistence = Arc::new(LocalDocumentCloudPersistence::default());
  40. let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
  41. let folder_manager = Arc::new(ServerFolderManager::new(persistence));
  42. let stop_tx = RwLock::new(None);
  43. LocalServer {
  44. doc_manager,
  45. folder_manager,
  46. stop_tx,
  47. client_ws_sender,
  48. client_ws_receiver,
  49. }
  50. }
  51. pub async fn stop(&self) {
  52. if let Some(stop_tx) = self.stop_tx.read().clone() {
  53. let _ = stop_tx.send(()).await;
  54. }
  55. }
  56. pub fn run(&self) {
  57. let (stop_tx, stop_rx) = mpsc::channel(1);
  58. *self.stop_tx.write() = Some(stop_tx);
  59. let runner = LocalWebSocketRunner {
  60. doc_manager: self.doc_manager.clone(),
  61. folder_manager: self.folder_manager.clone(),
  62. stop_rx: Some(stop_rx),
  63. client_ws_sender: self.client_ws_sender.clone(),
  64. client_ws_receiver: Some(self.client_ws_receiver.subscribe()),
  65. };
  66. tokio::spawn(runner.run());
  67. }
  68. }
  69. struct LocalWebSocketRunner {
  70. doc_manager: Arc<ServerDocumentManager>,
  71. folder_manager: Arc<ServerFolderManager>,
  72. stop_rx: Option<mpsc::Receiver<()>>,
  73. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  74. client_ws_receiver: Option<broadcast::Receiver<WebSocketRawMessage>>,
  75. }
  76. impl LocalWebSocketRunner {
  77. pub async fn run(mut self) {
  78. let mut stop_rx = self.stop_rx.take().expect("Only run once");
  79. let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once");
  80. let stream = stream! {
  81. loop {
  82. tokio::select! {
  83. result = client_ws_receiver.recv() => {
  84. match result {
  85. Ok(msg) => yield msg,
  86. Err(_e) => {},
  87. }
  88. },
  89. _ = stop_rx.recv() => {
  90. tracing::trace!("[LocalWebSocketRunner] stop");
  91. break
  92. },
  93. };
  94. }
  95. };
  96. stream
  97. .for_each(|message| async {
  98. match self.handle_message(message).await {
  99. Ok(_) => {}
  100. Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e),
  101. }
  102. })
  103. .await;
  104. }
  105. async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
  106. let bytes = Bytes::from(message.data);
  107. let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
  108. match message.channel {
  109. WSChannel::Document => {
  110. let _ = self.handle_document_client_data(client_data, "".to_owned()).await?;
  111. Ok(())
  112. }
  113. WSChannel::Folder => {
  114. let _ = self.handle_folder_client_data(client_data, "".to_owned()).await?;
  115. Ok(())
  116. }
  117. WSChannel::Grid => {
  118. todo!("Implement grid web socket channel")
  119. }
  120. }
  121. }
  122. pub async fn handle_folder_client_data(
  123. &self,
  124. client_data: ClientRevisionWSData,
  125. user_id: String,
  126. ) -> Result<(), CollaborateError> {
  127. tracing::trace!(
  128. "[LocalFolderServer] receive: {}:{}-{:?} ",
  129. client_data.object_id,
  130. client_data.id(),
  131. client_data.ty,
  132. );
  133. let client_ws_sender = self.client_ws_sender.clone();
  134. let user = Arc::new(LocalRevisionUser {
  135. user_id,
  136. client_ws_sender,
  137. channel: WSChannel::Folder,
  138. });
  139. let ty = client_data.ty.clone();
  140. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  141. match ty {
  142. ClientRevisionWSDataType::ClientPushRev => {
  143. let _ = self
  144. .folder_manager
  145. .handle_client_revisions(user, document_client_data)
  146. .await?;
  147. }
  148. ClientRevisionWSDataType::ClientPing => {
  149. let _ = self
  150. .folder_manager
  151. .handle_client_ping(user, document_client_data)
  152. .await?;
  153. }
  154. }
  155. Ok(())
  156. }
  157. pub async fn handle_document_client_data(
  158. &self,
  159. client_data: ClientRevisionWSData,
  160. user_id: String,
  161. ) -> Result<(), CollaborateError> {
  162. tracing::trace!(
  163. "[LocalDocumentServer] receive: {}:{}-{:?} ",
  164. client_data.object_id,
  165. client_data.id(),
  166. client_data.ty,
  167. );
  168. let client_ws_sender = self.client_ws_sender.clone();
  169. let user = Arc::new(LocalRevisionUser {
  170. user_id,
  171. client_ws_sender,
  172. channel: WSChannel::Document,
  173. });
  174. let ty = client_data.ty.clone();
  175. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  176. match ty {
  177. ClientRevisionWSDataType::ClientPushRev => {
  178. let _ = self
  179. .doc_manager
  180. .handle_client_revisions(user, document_client_data)
  181. .await?;
  182. }
  183. ClientRevisionWSDataType::ClientPing => {
  184. let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
  185. }
  186. }
  187. Ok(())
  188. }
  189. }
  190. #[derive(Debug)]
  191. struct LocalRevisionUser {
  192. user_id: String,
  193. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  194. channel: WSChannel,
  195. }
  196. impl RevisionUser for LocalRevisionUser {
  197. fn user_id(&self) -> String {
  198. self.user_id.clone()
  199. }
  200. fn receive(&self, resp: RevisionSyncResponse) {
  201. let sender = self.client_ws_sender.clone();
  202. let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
  203. Ok(_) => {}
  204. Err(e) => {
  205. tracing::error!("LocalDocumentUser send message failed: {}", e);
  206. }
  207. };
  208. let channel = self.channel.clone();
  209. tokio::spawn(async move {
  210. match resp {
  211. RevisionSyncResponse::Pull(data) => {
  212. let bytes: Bytes = data.try_into().unwrap();
  213. let msg = WebSocketRawMessage {
  214. channel,
  215. data: bytes.to_vec(),
  216. };
  217. send_fn(sender, msg);
  218. }
  219. RevisionSyncResponse::Push(data) => {
  220. let bytes: Bytes = data.try_into().unwrap();
  221. let msg = WebSocketRawMessage {
  222. channel,
  223. data: bytes.to_vec(),
  224. };
  225. send_fn(sender, msg);
  226. }
  227. RevisionSyncResponse::Ack(data) => {
  228. let bytes: Bytes = data.try_into().unwrap();
  229. let msg = WebSocketRawMessage {
  230. channel,
  231. data: bytes.to_vec(),
  232. };
  233. send_fn(sender, msg);
  234. }
  235. }
  236. });
  237. }
  238. }
  239. use flowy_document::DocumentCloudService;
  240. use flowy_folder::entities::{
  241. app::{AppIdPB, CreateAppParams, UpdateAppParams},
  242. trash::RepeatedTrashIdPB,
  243. view::{CreateViewParams, RepeatedViewIdPB, UpdateViewParams, ViewIdPB},
  244. workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceIdPB},
  245. };
  246. use flowy_folder_data_model::revision::{
  247. gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision,
  248. };
  249. use flowy_user::entities::{
  250. SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB,
  251. };
  252. use flowy_user::event_map::UserCloudService;
  253. use lib_infra::{future::FutureResult, util::timestamp};
  254. impl FolderCouldServiceV1 for LocalServer {
  255. fn init(&self) {}
  256. fn create_workspace(
  257. &self,
  258. _token: &str,
  259. params: CreateWorkspaceParams,
  260. ) -> FutureResult<WorkspaceRevision, FlowyError> {
  261. let time = timestamp();
  262. let workspace = WorkspaceRevision {
  263. id: gen_workspace_id(),
  264. name: params.name,
  265. desc: params.desc,
  266. apps: vec![],
  267. modified_time: time,
  268. create_time: time,
  269. };
  270. FutureResult::new(async { Ok(workspace) })
  271. }
  272. fn read_workspace(&self, _token: &str, _params: WorkspaceIdPB) -> FutureResult<Vec<WorkspaceRevision>, FlowyError> {
  273. FutureResult::new(async { Ok(vec![]) })
  274. }
  275. fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
  276. FutureResult::new(async { Ok(()) })
  277. }
  278. fn delete_workspace(&self, _token: &str, _params: WorkspaceIdPB) -> FutureResult<(), FlowyError> {
  279. FutureResult::new(async { Ok(()) })
  280. }
  281. fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<ViewRevision, FlowyError> {
  282. let time = timestamp();
  283. let view = ViewRevision {
  284. id: params.view_id,
  285. app_id: params.belong_to_id,
  286. name: params.name,
  287. desc: params.desc,
  288. data_type: params.data_type.into(),
  289. version: 0,
  290. belongings: vec![],
  291. modified_time: time,
  292. create_time: time,
  293. ext_data: "".to_string(),
  294. thumbnail: params.thumbnail,
  295. layout: params.layout.into(),
  296. };
  297. FutureResult::new(async { Ok(view) })
  298. }
  299. fn read_view(&self, _token: &str, _params: ViewIdPB) -> FutureResult<Option<ViewRevision>, FlowyError> {
  300. FutureResult::new(async { Ok(None) })
  301. }
  302. fn delete_view(&self, _token: &str, _params: RepeatedViewIdPB) -> FutureResult<(), FlowyError> {
  303. FutureResult::new(async { Ok(()) })
  304. }
  305. fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
  306. FutureResult::new(async { Ok(()) })
  307. }
  308. fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<AppRevision, FlowyError> {
  309. let time = timestamp();
  310. let app = AppRevision {
  311. id: gen_app_id(),
  312. workspace_id: params.workspace_id,
  313. name: params.name,
  314. desc: params.desc,
  315. belongings: vec![],
  316. version: 0,
  317. modified_time: time,
  318. create_time: time,
  319. };
  320. FutureResult::new(async { Ok(app) })
  321. }
  322. fn read_app(&self, _token: &str, _params: AppIdPB) -> FutureResult<Option<AppRevision>, FlowyError> {
  323. FutureResult::new(async { Ok(None) })
  324. }
  325. fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
  326. FutureResult::new(async { Ok(()) })
  327. }
  328. fn delete_app(&self, _token: &str, _params: AppIdPB) -> FutureResult<(), FlowyError> {
  329. FutureResult::new(async { Ok(()) })
  330. }
  331. fn create_trash(&self, _token: &str, _params: RepeatedTrashIdPB) -> FutureResult<(), FlowyError> {
  332. FutureResult::new(async { Ok(()) })
  333. }
  334. fn delete_trash(&self, _token: &str, _params: RepeatedTrashIdPB) -> FutureResult<(), FlowyError> {
  335. FutureResult::new(async { Ok(()) })
  336. }
  337. fn read_trash(&self, _token: &str) -> FutureResult<Vec<TrashRevision>, FlowyError> {
  338. FutureResult::new(async { Ok(vec![]) })
  339. }
  340. }
  341. impl UserCloudService for LocalServer {
  342. fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
  343. let uid = nanoid!(20);
  344. FutureResult::new(async move {
  345. Ok(SignUpResponse {
  346. user_id: uid.clone(),
  347. name: params.name,
  348. email: params.email,
  349. token: uid,
  350. })
  351. })
  352. }
  353. fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
  354. let user_id = nanoid!(20);
  355. FutureResult::new(async {
  356. Ok(SignInResponse {
  357. user_id: user_id.clone(),
  358. name: params.name,
  359. email: params.email,
  360. token: user_id,
  361. })
  362. })
  363. }
  364. fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> {
  365. FutureResult::new(async { Ok(()) })
  366. }
  367. fn update_user(&self, _token: &str, _params: UpdateUserProfileParams) -> FutureResult<(), FlowyError> {
  368. FutureResult::new(async { Ok(()) })
  369. }
  370. fn get_user(&self, _token: &str) -> FutureResult<UserProfilePB, FlowyError> {
  371. FutureResult::new(async { Ok(UserProfilePB::default()) })
  372. }
  373. fn ws_addr(&self) -> String {
  374. "ws://localhost:8000/ws/".to_owned()
  375. }
  376. }
  377. impl DocumentCloudService for LocalServer {
  378. fn create_document(&self, _token: &str, _params: CreateDocumentParams) -> FutureResult<(), FlowyError> {
  379. FutureResult::new(async { Ok(()) })
  380. }
  381. fn fetch_document(
  382. &self,
  383. _token: &str,
  384. _params: DocumentIdPB,
  385. ) -> FutureResult<Option<DocumentPayloadPB>, FlowyError> {
  386. FutureResult::new(async { Ok(None) })
  387. }
  388. fn update_document_content(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
  389. FutureResult::new(async { Ok(()) })
  390. }
  391. }