cache.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. use crate::{
  2. errors::{internal_error, DocResult},
  3. services::doc::{edit::DocId, Document, UndoResult},
  4. sql_tables::{DocTableChangeset, DocTableSql},
  5. };
  6. use async_stream::stream;
  7. use flowy_database::ConnectionPool;
  8. use flowy_ot::core::{Attribute, Delta, Interval};
  9. use futures::stream::StreamExt;
  10. use std::{cell::RefCell, sync::Arc};
  11. use tokio::sync::{mpsc, oneshot};
  12. pub type Ret<T> = oneshot::Sender<DocResult<T>>;
  13. pub enum EditMsg {
  14. Delta {
  15. delta: Delta,
  16. ret: Ret<()>,
  17. },
  18. Insert {
  19. index: usize,
  20. data: String,
  21. ret: Ret<Delta>,
  22. },
  23. Delete {
  24. interval: Interval,
  25. ret: Ret<Delta>,
  26. },
  27. Format {
  28. interval: Interval,
  29. attribute: Attribute,
  30. ret: Ret<Delta>,
  31. },
  32. Replace {
  33. interval: Interval,
  34. data: String,
  35. ret: Ret<Delta>,
  36. },
  37. CanUndo {
  38. ret: oneshot::Sender<bool>,
  39. },
  40. CanRedo {
  41. ret: oneshot::Sender<bool>,
  42. },
  43. Undo {
  44. ret: Ret<UndoResult>,
  45. },
  46. Redo {
  47. ret: Ret<UndoResult>,
  48. },
  49. Doc {
  50. ret: Ret<String>,
  51. },
  52. SaveRevision {
  53. rev_id: i64,
  54. ret: Ret<()>,
  55. },
  56. }
  57. pub struct DocumentEditActor {
  58. doc_id: DocId,
  59. document: RefCell<Document>,
  60. pool: Arc<ConnectionPool>,
  61. receiver: Option<mpsc::UnboundedReceiver<EditMsg>>,
  62. }
  63. impl DocumentEditActor {
  64. pub fn new(
  65. doc_id: &str,
  66. delta: Delta,
  67. pool: Arc<ConnectionPool>,
  68. receiver: mpsc::UnboundedReceiver<EditMsg>,
  69. ) -> Self {
  70. let doc_id = doc_id.to_string();
  71. let document = RefCell::new(Document::from_delta(delta));
  72. Self {
  73. doc_id,
  74. document,
  75. pool,
  76. receiver: Some(receiver),
  77. }
  78. }
  79. pub async fn run(mut self) {
  80. let mut receiver = self.receiver.take().expect("Should only call once");
  81. let stream = stream! {
  82. loop {
  83. match receiver.recv().await {
  84. Some(msg) => yield msg,
  85. None => break,
  86. }
  87. }
  88. };
  89. stream.for_each(|msg| self.handle_message(msg)).await;
  90. }
  91. async fn handle_message(&self, msg: EditMsg) {
  92. match msg {
  93. EditMsg::Delta { delta, ret } => {
  94. let result = self.document.borrow_mut().compose_delta(&delta);
  95. let _ = ret.send(result);
  96. },
  97. EditMsg::Insert { index, data, ret } => {
  98. let delta = self.document.borrow_mut().insert(index, data);
  99. let _ = ret.send(delta);
  100. },
  101. EditMsg::Delete { interval, ret } => {
  102. let result = self.document.borrow_mut().delete(interval);
  103. let _ = ret.send(result);
  104. },
  105. EditMsg::Format {
  106. interval,
  107. attribute,
  108. ret,
  109. } => {
  110. let result = self.document.borrow_mut().format(interval, attribute);
  111. let _ = ret.send(result);
  112. },
  113. EditMsg::Replace { interval, data, ret } => {
  114. let result = self.document.borrow_mut().replace(interval, data);
  115. let _ = ret.send(result);
  116. },
  117. EditMsg::CanUndo { ret } => {
  118. let _ = ret.send(self.document.borrow().can_undo());
  119. },
  120. EditMsg::CanRedo { ret } => {
  121. let _ = ret.send(self.document.borrow().can_redo());
  122. },
  123. EditMsg::Undo { ret } => {
  124. let result = self.document.borrow_mut().undo();
  125. let _ = ret.send(result);
  126. },
  127. EditMsg::Redo { ret } => {
  128. let result = self.document.borrow_mut().redo();
  129. let _ = ret.send(result);
  130. },
  131. EditMsg::Doc { ret } => {
  132. let data = self.document.borrow().to_json();
  133. let _ = ret.send(Ok(data));
  134. },
  135. EditMsg::SaveRevision { rev_id, ret } => {
  136. let result = self.save_to_disk(rev_id);
  137. let _ = ret.send(result);
  138. },
  139. }
  140. }
  141. #[tracing::instrument(level = "debug", skip(self, rev_id), err)]
  142. fn save_to_disk(&self, rev_id: i64) -> DocResult<()> {
  143. let data = self.document.borrow().to_json();
  144. let changeset = DocTableChangeset {
  145. id: self.doc_id.clone(),
  146. data,
  147. rev_id,
  148. };
  149. let sql = DocTableSql {};
  150. let conn = self.pool.get().map_err(internal_error)?;
  151. let _ = sql.update_doc_table(changeset, &*conn)?;
  152. Ok(())
  153. }
  154. }
  155. // #[tracing::instrument(level = "debug", skip(self, params), err)]
  156. // fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
  157. // DocError> { let token = self.user.token()?;
  158. // let server = self.server.clone();
  159. // tokio::spawn(async move {
  160. // match server.update_doc(&token, params).await {
  161. // Ok(_) => {},
  162. // Err(e) => {
  163. // // TODO: retry?
  164. // log::error!("Update doc failed: {}", e);
  165. // },
  166. // }
  167. // });
  168. // Ok(())
  169. // }