supabase_realtime.dart 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'package:appflowy/user/application/user_service.dart';
  4. import 'package:appflowy_backend/dispatch/dispatch.dart';
  5. import 'package:appflowy_backend/log.dart';
  6. import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart';
  7. import 'package:supabase_flutter/supabase_flutter.dart';
  8. /// A service to manage realtime interactions with Supabase.
  9. ///
  10. /// `SupbaseRealtimeService` handles subscribing to table changes in Supabase
  11. /// based on the authentication state of a user. The service is initialized with
  12. /// a reference to a Supabase instance and sets up the necessary subscriptions
  13. /// accordingly.
  14. class SupbaseRealtimeService {
  15. final Supabase supabase;
  16. RealtimeChannel? channel;
  17. StreamSubscription<AuthState>? authStateSubscription;
  18. SupbaseRealtimeService({required this.supabase}) {
  19. _subscribeAuthState();
  20. }
  21. void _subscribeAuthState() {
  22. final auth = Supabase.instance.client.auth;
  23. authStateSubscription = auth.onAuthStateChange.listen((state) async {
  24. switch (state.event) {
  25. case AuthChangeEvent.signedIn:
  26. _subscribeTablesChanges();
  27. break;
  28. case AuthChangeEvent.signedOut:
  29. channel?.unsubscribe();
  30. break;
  31. case AuthChangeEvent.tokenRefreshed:
  32. _subscribeTablesChanges();
  33. break;
  34. default:
  35. break;
  36. }
  37. });
  38. }
  39. Future<void> _subscribeTablesChanges() async {
  40. final result = await UserBackendService.getCurrentUserProfile();
  41. result.fold((l) => null, (userProfile) {
  42. Log.info("Start listening to table changes");
  43. // https://supabase.com/docs/guides/realtime/postgres-changes
  44. final filters = [
  45. "document",
  46. "folder",
  47. "database",
  48. "database_row",
  49. "w_database",
  50. ].map(
  51. (name) => ChannelFilter(
  52. event: 'INSERT',
  53. schema: 'public',
  54. table: "af_collab_update_$name",
  55. filter: 'uid=eq.${userProfile.id}',
  56. ),
  57. );
  58. const ops = RealtimeChannelConfig(ack: true);
  59. channel = supabase.client.channel("table-db-changes", opts: ops);
  60. for (final filter in filters) {
  61. channel?.on(
  62. RealtimeListenTypes.postgresChanges,
  63. filter,
  64. (payload, [ref]) {
  65. try {
  66. final jsonStr = jsonEncode(payload);
  67. final pb = RealtimePayloadPB.create()..jsonStr = jsonStr;
  68. UserEventPushRealtimeEvent(pb).send();
  69. } catch (e) {
  70. Log.error(e);
  71. }
  72. },
  73. );
  74. }
  75. channel?.subscribe(
  76. (status, [err]) {
  77. Log.info(
  78. "subscribe channel statue: $status, err: $err",
  79. );
  80. },
  81. );
  82. });
  83. }
  84. }