server.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. use crate::local_server::persistence::LocalDocumentCloudPersistence;
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_error::{internal_error, FlowyError};
  5. use flowy_folder::event_map::FolderCouldServiceV1;
  6. use flowy_sync::{
  7. errors::CollaborateError,
  8. server_document::ServerDocumentManager,
  9. server_folder::ServerFolderManager,
  10. synchronizer::{RevisionSyncResponse, RevisionUser},
  11. };
  12. use futures_util::stream::StreamExt;
  13. use lib_ws::{WSChannel, WebSocketRawMessage};
  14. use nanoid::nanoid;
  15. use parking_lot::RwLock;
  16. use std::{
  17. convert::{TryFrom, TryInto},
  18. fmt::Debug,
  19. sync::Arc,
  20. };
  21. use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender};
  22. pub struct LocalServer {
  23. doc_manager: Arc<ServerDocumentManager>,
  24. folder_manager: Arc<ServerFolderManager>,
  25. stop_tx: RwLock<Option<mpsc::Sender<()>>>,
  26. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  27. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  28. }
  29. impl LocalServer {
  30. pub fn new(
  31. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  32. client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
  33. ) -> Self {
  34. let persistence = Arc::new(LocalDocumentCloudPersistence::default());
  35. let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
  36. let folder_manager = Arc::new(ServerFolderManager::new(persistence));
  37. let stop_tx = RwLock::new(None);
  38. LocalServer {
  39. doc_manager,
  40. folder_manager,
  41. stop_tx,
  42. client_ws_sender,
  43. client_ws_receiver,
  44. }
  45. }
  46. pub async fn stop(&self) {
  47. if let Some(stop_tx) = self.stop_tx.read().clone() {
  48. let _ = stop_tx.send(()).await;
  49. }
  50. }
  51. pub fn run(&self) {
  52. let (stop_tx, stop_rx) = mpsc::channel(1);
  53. *self.stop_tx.write() = Some(stop_tx);
  54. let runner = LocalWebSocketRunner {
  55. doc_manager: self.doc_manager.clone(),
  56. folder_manager: self.folder_manager.clone(),
  57. stop_rx: Some(stop_rx),
  58. client_ws_sender: self.client_ws_sender.clone(),
  59. client_ws_receiver: Some(self.client_ws_receiver.subscribe()),
  60. };
  61. tokio::spawn(runner.run());
  62. }
  63. }
  64. struct LocalWebSocketRunner {
  65. doc_manager: Arc<ServerDocumentManager>,
  66. folder_manager: Arc<ServerFolderManager>,
  67. stop_rx: Option<mpsc::Receiver<()>>,
  68. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  69. client_ws_receiver: Option<broadcast::Receiver<WebSocketRawMessage>>,
  70. }
  71. impl LocalWebSocketRunner {
  72. pub async fn run(mut self) {
  73. let mut stop_rx = self.stop_rx.take().expect("Only run once");
  74. let mut client_ws_receiver = self.client_ws_receiver.take().expect("Only run once");
  75. let stream = stream! {
  76. loop {
  77. tokio::select! {
  78. result = client_ws_receiver.recv() => {
  79. match result {
  80. Ok(msg) => yield msg,
  81. Err(_e) => {},
  82. }
  83. },
  84. _ = stop_rx.recv() => {
  85. tracing::trace!("[LocalWebSocketRunner] stop");
  86. break
  87. },
  88. };
  89. }
  90. };
  91. stream
  92. .for_each(|message| async {
  93. match self.handle_message(message).await {
  94. Ok(_) => {}
  95. Err(e) => tracing::error!("[LocalWebSocketRunner]: {}", e),
  96. }
  97. })
  98. .await;
  99. }
  100. async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
  101. let bytes = Bytes::from(message.data);
  102. let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
  103. match message.channel {
  104. WSChannel::Document => {
  105. self.handle_document_client_data(client_data, "".to_owned()).await?;
  106. Ok(())
  107. }
  108. WSChannel::Folder => {
  109. self.handle_folder_client_data(client_data, "".to_owned()).await?;
  110. Ok(())
  111. }
  112. WSChannel::Grid => {
  113. todo!("Implement grid web socket channel")
  114. }
  115. }
  116. }
  117. pub async fn handle_folder_client_data(
  118. &self,
  119. client_data: ClientRevisionWSData,
  120. user_id: String,
  121. ) -> Result<(), CollaborateError> {
  122. tracing::trace!(
  123. "[LocalFolderServer] receive: {}:{}-{:?} ",
  124. client_data.object_id,
  125. client_data.rev_id,
  126. client_data.ty,
  127. );
  128. let client_ws_sender = self.client_ws_sender.clone();
  129. let user = Arc::new(LocalRevisionUser {
  130. user_id,
  131. client_ws_sender,
  132. channel: WSChannel::Folder,
  133. });
  134. let ty = client_data.ty.clone();
  135. match ty {
  136. ClientRevisionWSDataType::ClientPushRev => {
  137. self.folder_manager.handle_client_revisions(user, client_data).await?;
  138. }
  139. ClientRevisionWSDataType::ClientPing => {
  140. self.folder_manager.handle_client_ping(user, client_data).await?;
  141. }
  142. }
  143. Ok(())
  144. }
  145. pub async fn handle_document_client_data(
  146. &self,
  147. client_data: ClientRevisionWSData,
  148. user_id: String,
  149. ) -> Result<(), CollaborateError> {
  150. tracing::trace!(
  151. "[LocalDocumentServer] receive: {}:{}-{:?} ",
  152. client_data.object_id,
  153. client_data.rev_id,
  154. client_data.ty,
  155. );
  156. let client_ws_sender = self.client_ws_sender.clone();
  157. let user = Arc::new(LocalRevisionUser {
  158. user_id,
  159. client_ws_sender,
  160. channel: WSChannel::Document,
  161. });
  162. let ty = client_data.ty.clone();
  163. match ty {
  164. ClientRevisionWSDataType::ClientPushRev => {
  165. self.doc_manager.handle_client_revisions(user, client_data).await?;
  166. }
  167. ClientRevisionWSDataType::ClientPing => {
  168. self.doc_manager.handle_client_ping(user, client_data).await?;
  169. }
  170. }
  171. Ok(())
  172. }
  173. }
  174. #[derive(Debug)]
  175. struct LocalRevisionUser {
  176. user_id: String,
  177. client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
  178. channel: WSChannel,
  179. }
  180. impl RevisionUser for LocalRevisionUser {
  181. fn user_id(&self) -> String {
  182. self.user_id.clone()
  183. }
  184. fn receive(&self, resp: RevisionSyncResponse) {
  185. let sender = self.client_ws_sender.clone();
  186. let send_fn = |sender: UnboundedSender<WebSocketRawMessage>, msg: WebSocketRawMessage| match sender.send(msg) {
  187. Ok(_) => {}
  188. Err(e) => {
  189. tracing::error!("LocalDocumentUser send message failed: {}", e);
  190. }
  191. };
  192. let channel = self.channel.clone();
  193. tokio::spawn(async move {
  194. match resp {
  195. RevisionSyncResponse::Pull(data) => {
  196. let bytes: Bytes = data.try_into().unwrap();
  197. let msg = WebSocketRawMessage {
  198. channel,
  199. data: bytes.to_vec(),
  200. };
  201. send_fn(sender, msg);
  202. }
  203. RevisionSyncResponse::Push(data) => {
  204. let bytes: Bytes = data.try_into().unwrap();
  205. let msg = WebSocketRawMessage {
  206. channel,
  207. data: bytes.to_vec(),
  208. };
  209. send_fn(sender, msg);
  210. }
  211. RevisionSyncResponse::Ack(data) => {
  212. let bytes: Bytes = data.try_into().unwrap();
  213. let msg = WebSocketRawMessage {
  214. channel,
  215. data: bytes.to_vec(),
  216. };
  217. send_fn(sender, msg);
  218. }
  219. }
  220. });
  221. }
  222. }
  223. use flowy_document::DocumentCloudService;
  224. use flowy_folder::entities::{
  225. app::{AppIdPB, CreateAppParams, UpdateAppParams},
  226. trash::RepeatedTrashIdPB,
  227. view::{CreateViewParams, RepeatedViewIdPB, UpdateViewParams, ViewIdPB},
  228. workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceIdPB},
  229. };
  230. use flowy_http_model::document::{CreateDocumentParams, DocumentId, DocumentPayload, ResetDocumentParams};
  231. use flowy_http_model::ws_data::{ClientRevisionWSData, ClientRevisionWSDataType};
  232. use flowy_user::entities::{
  233. SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB,
  234. };
  235. use flowy_user::event_map::UserCloudService;
  236. use folder_rev_model::{gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision};
  237. use lib_infra::{future::FutureResult, util::timestamp};
  238. impl FolderCouldServiceV1 for LocalServer {
  239. fn init(&self) {}
  240. fn create_workspace(
  241. &self,
  242. _token: &str,
  243. params: CreateWorkspaceParams,
  244. ) -> FutureResult<WorkspaceRevision, FlowyError> {
  245. let time = timestamp();
  246. let workspace = WorkspaceRevision {
  247. id: gen_workspace_id(),
  248. name: params.name,
  249. desc: params.desc,
  250. apps: vec![],
  251. modified_time: time,
  252. create_time: time,
  253. };
  254. FutureResult::new(async { Ok(workspace) })
  255. }
  256. fn read_workspace(&self, _token: &str, _params: WorkspaceIdPB) -> FutureResult<Vec<WorkspaceRevision>, FlowyError> {
  257. FutureResult::new(async { Ok(vec![]) })
  258. }
  259. fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
  260. FutureResult::new(async { Ok(()) })
  261. }
  262. fn delete_workspace(&self, _token: &str, _params: WorkspaceIdPB) -> FutureResult<(), FlowyError> {
  263. FutureResult::new(async { Ok(()) })
  264. }
  265. fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<ViewRevision, FlowyError> {
  266. let time = timestamp();
  267. let view = ViewRevision {
  268. id: params.view_id,
  269. app_id: params.belong_to_id,
  270. name: params.name,
  271. desc: params.desc,
  272. data_format: params.data_format.into(),
  273. version: 0,
  274. belongings: vec![],
  275. modified_time: time,
  276. create_time: time,
  277. ext_data: "".to_string(),
  278. thumbnail: params.thumbnail,
  279. layout: params.layout.into(),
  280. };
  281. FutureResult::new(async { Ok(view) })
  282. }
  283. fn read_view(&self, _token: &str, _params: ViewIdPB) -> FutureResult<Option<ViewRevision>, FlowyError> {
  284. FutureResult::new(async { Ok(None) })
  285. }
  286. fn delete_view(&self, _token: &str, _params: RepeatedViewIdPB) -> FutureResult<(), FlowyError> {
  287. FutureResult::new(async { Ok(()) })
  288. }
  289. fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
  290. FutureResult::new(async { Ok(()) })
  291. }
  292. fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<AppRevision, FlowyError> {
  293. let time = timestamp();
  294. let app = AppRevision {
  295. id: gen_app_id(),
  296. workspace_id: params.workspace_id,
  297. name: params.name,
  298. desc: params.desc,
  299. belongings: vec![],
  300. version: 0,
  301. modified_time: time,
  302. create_time: time,
  303. };
  304. FutureResult::new(async { Ok(app) })
  305. }
  306. fn read_app(&self, _token: &str, _params: AppIdPB) -> FutureResult<Option<AppRevision>, FlowyError> {
  307. FutureResult::new(async { Ok(None) })
  308. }
  309. fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
  310. FutureResult::new(async { Ok(()) })
  311. }
  312. fn delete_app(&self, _token: &str, _params: AppIdPB) -> FutureResult<(), FlowyError> {
  313. FutureResult::new(async { Ok(()) })
  314. }
  315. fn create_trash(&self, _token: &str, _params: RepeatedTrashIdPB) -> FutureResult<(), FlowyError> {
  316. FutureResult::new(async { Ok(()) })
  317. }
  318. fn delete_trash(&self, _token: &str, _params: RepeatedTrashIdPB) -> FutureResult<(), FlowyError> {
  319. FutureResult::new(async { Ok(()) })
  320. }
  321. fn read_trash(&self, _token: &str) -> FutureResult<Vec<TrashRevision>, FlowyError> {
  322. FutureResult::new(async { Ok(vec![]) })
  323. }
  324. }
  325. impl UserCloudService for LocalServer {
  326. fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
  327. let uid = nanoid!(20);
  328. FutureResult::new(async move {
  329. Ok(SignUpResponse {
  330. user_id: uid.clone(),
  331. name: params.name,
  332. email: params.email,
  333. token: uid,
  334. })
  335. })
  336. }
  337. fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
  338. let user_id = nanoid!(20);
  339. FutureResult::new(async {
  340. Ok(SignInResponse {
  341. user_id: user_id.clone(),
  342. name: params.name,
  343. email: params.email,
  344. token: user_id,
  345. })
  346. })
  347. }
  348. fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> {
  349. FutureResult::new(async { Ok(()) })
  350. }
  351. fn update_user(&self, _token: &str, _params: UpdateUserProfileParams) -> FutureResult<(), FlowyError> {
  352. FutureResult::new(async { Ok(()) })
  353. }
  354. fn get_user(&self, _token: &str) -> FutureResult<UserProfilePB, FlowyError> {
  355. FutureResult::new(async { Ok(UserProfilePB::default()) })
  356. }
  357. fn ws_addr(&self) -> String {
  358. "ws://localhost:8000/ws/".to_owned()
  359. }
  360. }
  361. impl DocumentCloudService for LocalServer {
  362. fn create_document(&self, _token: &str, _params: CreateDocumentParams) -> FutureResult<(), FlowyError> {
  363. FutureResult::new(async { Ok(()) })
  364. }
  365. fn fetch_document(&self, _token: &str, _params: DocumentId) -> FutureResult<Option<DocumentPayload>, FlowyError> {
  366. FutureResult::new(async { Ok(None) })
  367. }
  368. fn update_document_content(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
  369. FutureResult::new(async { Ok(()) })
  370. }
  371. }