document.rs 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. use anyhow::Error;
  2. use collab::core::origin::CollabOrigin;
  3. use collab_document::blocks::DocumentData;
  4. use collab_document::document::Document;
  5. use collab_plugins::cloud_storage::CollabType;
  6. use tokio::sync::oneshot::channel;
  7. use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
  8. use flowy_error::FlowyError;
  9. use lib_infra::future::FutureResult;
  10. use crate::supabase::api::request::{get_snapshots_from_server, FetchObjectUpdateAction};
  11. use crate::supabase::api::SupabaseServerService;
  12. pub struct SupabaseDocumentServiceImpl<T> {
  13. server: T,
  14. }
  15. impl<T> SupabaseDocumentServiceImpl<T> {
  16. pub fn new(server: T) -> Self {
  17. Self { server }
  18. }
  19. }
  20. impl<T> DocumentCloudService for SupabaseDocumentServiceImpl<T>
  21. where
  22. T: SupabaseServerService,
  23. {
  24. #[tracing::instrument(level = "debug", skip(self))]
  25. fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
  26. let try_get_postgrest = self.server.try_get_weak_postgrest();
  27. let document_id = document_id.to_string();
  28. let (tx, rx) = channel();
  29. tokio::spawn(async move {
  30. tx.send(
  31. async move {
  32. let postgrest = try_get_postgrest?;
  33. let action = FetchObjectUpdateAction::new(document_id, CollabType::Document, postgrest);
  34. let updates = action.run_with_fix_interval(5, 10).await?;
  35. if updates.is_empty() {
  36. return Err(FlowyError::collab_not_sync().into());
  37. }
  38. Ok(updates)
  39. }
  40. .await,
  41. )
  42. });
  43. FutureResult::new(async { rx.await? })
  44. }
  45. fn get_document_snapshots(
  46. &self,
  47. document_id: &str,
  48. limit: usize,
  49. ) -> FutureResult<Vec<DocumentSnapshot>, Error> {
  50. let try_get_postgrest = self.server.try_get_postgrest();
  51. let document_id = document_id.to_string();
  52. FutureResult::new(async move {
  53. let postgrest = try_get_postgrest?;
  54. let snapshots = get_snapshots_from_server(&document_id, postgrest, limit)
  55. .await?
  56. .into_iter()
  57. .map(|snapshot| DocumentSnapshot {
  58. snapshot_id: snapshot.sid,
  59. document_id: snapshot.oid,
  60. data: snapshot.blob,
  61. created_at: snapshot.created_at,
  62. })
  63. .collect::<Vec<_>>();
  64. Ok(snapshots)
  65. })
  66. }
  67. #[tracing::instrument(level = "debug", skip(self))]
  68. fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, Error> {
  69. let try_get_postgrest = self.server.try_get_weak_postgrest();
  70. let document_id = document_id.to_string();
  71. let (tx, rx) = channel();
  72. tokio::spawn(async move {
  73. tx.send(
  74. async move {
  75. let postgrest = try_get_postgrest?;
  76. let action =
  77. FetchObjectUpdateAction::new(document_id.clone(), CollabType::Document, postgrest);
  78. let updates = action.run_with_fix_interval(5, 10).await?;
  79. let document =
  80. Document::from_updates(CollabOrigin::Empty, updates, &document_id, vec![])?;
  81. Ok(document.get_document_data().ok())
  82. }
  83. .await,
  84. )
  85. });
  86. FutureResult::new(async { rx.await? })
  87. }
  88. }