server.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. use crate::local_server::persistence::LocalTextBlockCloudPersistence;
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_error::{internal_error, FlowyError};
  5. use flowy_folder::event_map::FolderCouldServiceV1;
  6. use flowy_sync::{
  7. client_document::default::initial_quill_delta_string,
  8. entities::{
  9. text_block::{CreateTextBlockParams, ResetTextBlockParams, TextBlockId, TextBlockInfo},
  10. ws_data::{ClientRevisionWSData, ClientRevisionWSDataType},
  11. },
  12. errors::CollaborateError,
  13. protobuf::ClientRevisionWSData as ClientRevisionWSDataPB,
  14. server_document::ServerDocumentManager,
  15. server_folder::ServerFolderManager,
  16. synchronizer::{RevisionSyncResponse, RevisionUser},
  17. };
  18. use futures_util::stream::StreamExt;
  19. use lib_ws::{WSChannel, WebSocketRawMessage};
  20. use nanoid::nanoid;
  21. use parking_lot::RwLock;
  22. use std::{
  23. convert::{TryFrom, TryInto},
  24. fmt::Debug,
  25. sync::Arc,
  26. };
  27. use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender};
  28. pub struct LocalServer {
  29. doc_manager: Arc<ServerDocumentManager>,
  30. folder_manager: Arc<ServerFolderManager>,
  31. stop_tx: RwLock<Option<mpsc::Sender<()>>>,
  32. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  33. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  34. }
  35. impl LocalServer {
  36. pub fn new(
  37. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  38. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  39. ) -> Self {
  40. let persistence = Arc::new(LocalTextBlockCloudPersistence::default());
  41. let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
  42. let folder_manager = Arc::new(ServerFolderManager::new(persistence));
  43. let stop_tx = RwLock::new(None);
  44. LocalServer {
  45. doc_manager,
  46. folder_manager,
  47. stop_tx,
  48. client_ws_sender,
  49. client_ws_receiver,
  50. }
  51. }
  52. pub async fn stop(&self) {
  53. if let Some(stop_tx) = self.stop_tx.read().clone() {
  54. let _ = stop_tx.send(()).await;
  55. }
  56. }
  57. pub fn run(&self) {
  58. let (stop_tx, stop_rx) = mpsc::channel(1);
  59. *self.stop_tx.write() = Some(stop_tx);
  60. let runner = LocalWebSocketRunner {
  61. doc_manager: self.doc_manager.clone(),
  62. folder_manager: self.folder_manager.clone(),
  63. stop_rx: Some(stop_rx),
  64. client_ws_sender: self.client_ws_sender.clone(),
  65. client_ws_receiver: Some(self.client_ws_receiver.subscribe()),
  66. };
  67. tokio::spawn(runner.run());
  68. }
  69. }
  70. struct LocalWebSocketRunner {
  71. doc_manager: Arc<ServerDocumentManager>,
  72. folder_manager: Arc<ServerFolderManager>,
  73. stop_rx: Option<mpsc::Receiver<()>>,
  74. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  75. client_ws_receiver: Option<broadcast::Receiver<WebSocketRawMessage>>,
  76. }
  77. impl LocalWebSocketRunner {
  78. pub async fn run(mut self) {
  79. let mut stop_rx = self.stop_rx.take().expect("Only run once");
  80. let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once");
  81. let stream = stream! {
  82. loop {
  83. tokio::select! {
  84. result = client_ws_receiver.recv() => {
  85. match result {
  86. Ok(msg) => yield msg,
  87. Err(_e) => {},
  88. }
  89. },
  90. _ = stop_rx.recv() => {
  91. tracing::trace!("[LocalWebSocketRunner] stop");
  92. break
  93. },
  94. };
  95. }
  96. };
  97. stream
  98. .for_each(|message| async {
  99. match self.handle_message(message).await {
  100. Ok(_) => {}
  101. Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e),
  102. }
  103. })
  104. .await;
  105. }
  106. async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
  107. let bytes = Bytes::from(message.data);
  108. let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
  109. match message.channel {
  110. WSChannel::Document => {
  111. let _ = self.handle_document_client_data(client_data, "".to_owned()).await?;
  112. Ok(())
  113. }
  114. WSChannel::Folder => {
  115. let _ = self.handle_folder_client_data(client_data, "".to_owned()).await?;
  116. Ok(())
  117. }
  118. WSChannel::Grid => {
  119. todo!("Implement grid web socket channel")
  120. }
  121. }
  122. }
  123. pub async fn handle_folder_client_data(
  124. &self,
  125. client_data: ClientRevisionWSData,
  126. user_id: String,
  127. ) -> Result<(), CollaborateError> {
  128. tracing::trace!(
  129. "[LocalFolderServer] receive: {}:{}-{:?} ",
  130. client_data.object_id,
  131. client_data.id(),
  132. client_data.ty,
  133. );
  134. let client_ws_sender = self.client_ws_sender.clone();
  135. let user = Arc::new(LocalRevisionUser {
  136. user_id,
  137. client_ws_sender,
  138. channel: WSChannel::Folder,
  139. });
  140. let ty = client_data.ty.clone();
  141. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  142. match ty {
  143. ClientRevisionWSDataType::ClientPushRev => {
  144. let _ = self
  145. .folder_manager
  146. .handle_client_revisions(user, document_client_data)
  147. .await?;
  148. }
  149. ClientRevisionWSDataType::ClientPing => {
  150. let _ = self
  151. .folder_manager
  152. .handle_client_ping(user, document_client_data)
  153. .await?;
  154. }
  155. }
  156. Ok(())
  157. }
  158. pub async fn handle_document_client_data(
  159. &self,
  160. client_data: ClientRevisionWSData,
  161. user_id: String,
  162. ) -> Result<(), CollaborateError> {
  163. tracing::trace!(
  164. "[LocalDocumentServer] receive: {}:{}-{:?} ",
  165. client_data.object_id,
  166. client_data.id(),
  167. client_data.ty,
  168. );
  169. let client_ws_sender = self.client_ws_sender.clone();
  170. let user = Arc::new(LocalRevisionUser {
  171. user_id,
  172. client_ws_sender,
  173. channel: WSChannel::Document,
  174. });
  175. let ty = client_data.ty.clone();
  176. let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
  177. match ty {
  178. ClientRevisionWSDataType::ClientPushRev => {
  179. let _ = self
  180. .doc_manager
  181. .handle_client_revisions(user, document_client_data)
  182. .await?;
  183. }
  184. ClientRevisionWSDataType::ClientPing => {
  185. let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
  186. }
  187. }
  188. Ok(())
  189. }
  190. }
  191. #[derive(Debug)]
  192. struct LocalRevisionUser {
  193. user_id: String,
  194. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  195. channel: WSChannel,
  196. }
  197. impl RevisionUser for LocalRevisionUser {
  198. fn user_id(&self) -> String {
  199. self.user_id.clone()
  200. }
  201. fn receive(&self, resp: RevisionSyncResponse) {
  202. let sender = self.client_ws_sender.clone();
  203. let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
  204. Ok(_) => {}
  205. Err(e) => {
  206. tracing::error!("LocalDocumentUser send message failed: {}", e);
  207. }
  208. };
  209. let channel = self.channel.clone();
  210. tokio::spawn(async move {
  211. match resp {
  212. RevisionSyncResponse::Pull(data) => {
  213. let bytes: Bytes = data.try_into().unwrap();
  214. let msg = WebSocketRawMessage {
  215. channel,
  216. data: bytes.to_vec(),
  217. };
  218. send_fn(sender, msg);
  219. }
  220. RevisionSyncResponse::Push(data) => {
  221. let bytes: Bytes = data.try_into().unwrap();
  222. let msg = WebSocketRawMessage {
  223. channel,
  224. data: bytes.to_vec(),
  225. };
  226. send_fn(sender, msg);
  227. }
  228. RevisionSyncResponse::Ack(data) => {
  229. let bytes: Bytes = data.try_into().unwrap();
  230. let msg = WebSocketRawMessage {
  231. channel,
  232. data: bytes.to_vec(),
  233. };
  234. send_fn(sender, msg);
  235. }
  236. }
  237. });
  238. }
  239. }
  240. use flowy_folder::entities::{
  241. app::{AppId, CreateAppParams, UpdateAppParams},
  242. trash::RepeatedTrashId,
  243. view::{CreateViewParams, RepeatedViewId, UpdateViewParams, ViewId},
  244. workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceId},
  245. };
  246. use flowy_folder_data_model::revision::{
  247. gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision,
  248. };
  249. use flowy_text_block::BlockCloudService;
  250. use flowy_user::entities::{
  251. SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfile,
  252. };
  253. use flowy_user::event_map::UserCloudService;
  254. use lib_infra::{future::FutureResult, util::timestamp};
  255. impl FolderCouldServiceV1 for LocalServer {
  256. fn init(&self) {}
  257. fn create_workspace(
  258. &self,
  259. _token: &str,
  260. params: CreateWorkspaceParams,
  261. ) -> FutureResult<WorkspaceRevision, FlowyError> {
  262. let time = timestamp();
  263. let workspace = WorkspaceRevision {
  264. id: gen_workspace_id(),
  265. name: params.name,
  266. desc: params.desc,
  267. apps: vec![],
  268. modified_time: time,
  269. create_time: time,
  270. };
  271. FutureResult::new(async { Ok(workspace) })
  272. }
  273. fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<Vec<WorkspaceRevision>, FlowyError> {
  274. FutureResult::new(async { Ok(vec![]) })
  275. }
  276. fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
  277. FutureResult::new(async { Ok(()) })
  278. }
  279. fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
  280. FutureResult::new(async { Ok(()) })
  281. }
  282. fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<ViewRevision, FlowyError> {
  283. let time = timestamp();
  284. let view = ViewRevision {
  285. id: params.view_id,
  286. belong_to_id: params.belong_to_id,
  287. name: params.name,
  288. desc: params.desc,
  289. data_type: params.data_type.into(),
  290. version: 0,
  291. belongings: vec![],
  292. modified_time: time,
  293. create_time: time,
  294. ext_data: "".to_string(),
  295. thumbnail: params.thumbnail,
  296. plugin_type: params.plugin_type,
  297. };
  298. FutureResult::new(async { Ok(view) })
  299. }
  300. fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<ViewRevision>, FlowyError> {
  301. FutureResult::new(async { Ok(None) })
  302. }
  303. fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
  304. FutureResult::new(async { Ok(()) })
  305. }
  306. fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
  307. FutureResult::new(async { Ok(()) })
  308. }
  309. fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<AppRevision, FlowyError> {
  310. let time = timestamp();
  311. let app = AppRevision {
  312. id: gen_app_id(),
  313. workspace_id: params.workspace_id,
  314. name: params.name,
  315. desc: params.desc,
  316. belongings: vec![],
  317. version: 0,
  318. modified_time: time,
  319. create_time: time,
  320. };
  321. FutureResult::new(async { Ok(app) })
  322. }
  323. fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<AppRevision>, FlowyError> {
  324. FutureResult::new(async { Ok(None) })
  325. }
  326. fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
  327. FutureResult::new(async { Ok(()) })
  328. }
  329. fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
  330. FutureResult::new(async { Ok(()) })
  331. }
  332. fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
  333. FutureResult::new(async { Ok(()) })
  334. }
  335. fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
  336. FutureResult::new(async { Ok(()) })
  337. }
  338. fn read_trash(&self, _token: &str) -> FutureResult<Vec<TrashRevision>, FlowyError> {
  339. FutureResult::new(async { Ok(vec![]) })
  340. }
  341. }
  342. impl UserCloudService for LocalServer {
  343. fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
  344. let uid = nanoid!(20);
  345. FutureResult::new(async move {
  346. Ok(SignUpResponse {
  347. user_id: uid.clone(),
  348. name: params.name,
  349. email: params.email,
  350. token: uid,
  351. })
  352. })
  353. }
  354. fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
  355. let user_id = nanoid!(20);
  356. FutureResult::new(async {
  357. Ok(SignInResponse {
  358. user_id: user_id.clone(),
  359. name: params.name,
  360. email: params.email,
  361. token: user_id,
  362. })
  363. })
  364. }
  365. fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> {
  366. FutureResult::new(async { Ok(()) })
  367. }
  368. fn update_user(&self, _token: &str, _params: UpdateUserProfileParams) -> FutureResult<(), FlowyError> {
  369. FutureResult::new(async { Ok(()) })
  370. }
  371. fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
  372. FutureResult::new(async { Ok(UserProfile::default()) })
  373. }
  374. fn ws_addr(&self) -> String {
  375. "ws://localhost:8000/ws/".to_owned()
  376. }
  377. }
  378. impl BlockCloudService for LocalServer {
  379. fn create_block(&self, _token: &str, _params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
  380. FutureResult::new(async { Ok(()) })
  381. }
  382. fn read_block(&self, _token: &str, params: TextBlockId) -> FutureResult<Option<TextBlockInfo>, FlowyError> {
  383. let doc = TextBlockInfo {
  384. block_id: params.value,
  385. text: initial_quill_delta_string(),
  386. rev_id: 0,
  387. base_rev_id: 0,
  388. };
  389. FutureResult::new(async { Ok(Some(doc)) })
  390. }
  391. fn update_block(&self, _token: &str, _params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
  392. FutureResult::new(async { Ok(()) })
  393. }
  394. }