supabase_realtime.dart 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'package:appflowy/startup/startup.dart';
  4. import 'package:appflowy/user/application/user_auth_listener.dart';
  5. import 'package:appflowy/user/application/user_service.dart';
  6. import 'package:appflowy_backend/dispatch/dispatch.dart';
  7. import 'package:appflowy_backend/log.dart';
  8. import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart';
  9. import 'package:supabase_flutter/supabase_flutter.dart';
  10. import 'auth/auth_service.dart';
  11. /// A service to manage realtime interactions with Supabase.
  12. ///
  13. /// `SupbaseRealtimeService` handles subscribing to table changes in Supabase
  14. /// based on the authentication state of a user. The service is initialized with
  15. /// a reference to a Supabase instance and sets up the necessary subscriptions
  16. /// accordingly.
  17. class SupbaseRealtimeService {
  18. final Supabase supabase;
  19. final _authStateListener = UserAuthStateListener();
  20. bool isLoggingOut = false;
  21. RealtimeChannel? channel;
  22. StreamSubscription<AuthState>? authStateSubscription;
  23. SupbaseRealtimeService({required this.supabase}) {
  24. _subscribeAuthState();
  25. _subscribeTablesChanges();
  26. _authStateListener.start(
  27. didSignIn: () {
  28. _subscribeTablesChanges();
  29. isLoggingOut = false;
  30. },
  31. onForceLogout: (message) async {
  32. await getIt<AuthService>().signOut();
  33. channel?.unsubscribe();
  34. channel = null;
  35. if (!isLoggingOut) {
  36. await runAppFlowy();
  37. }
  38. },
  39. );
  40. }
  41. void _subscribeAuthState() {
  42. final auth = Supabase.instance.client.auth;
  43. authStateSubscription = auth.onAuthStateChange.listen((state) async {
  44. Log.info("Supabase auth state change: ${state.event}");
  45. });
  46. }
  47. Future<void> _subscribeTablesChanges() async {
  48. final result = await UserBackendService.getCurrentUserProfile();
  49. result.fold((l) => null, (userProfile) {
  50. Log.info("Start listening supabase table changes");
  51. // https://supabase.com/docs/guides/realtime/postgres-changes
  52. final List<ChannelFilter> filters = [
  53. "document",
  54. "folder",
  55. "database",
  56. "database_row",
  57. "w_database",
  58. ]
  59. .map(
  60. (name) => ChannelFilter(
  61. event: 'INSERT',
  62. schema: 'public',
  63. table: "af_collab_update_$name",
  64. filter: 'uid=eq.${userProfile.id}',
  65. ),
  66. )
  67. .toList();
  68. filters.add(
  69. ChannelFilter(
  70. event: 'UPDATE',
  71. schema: 'public',
  72. table: "af_user",
  73. filter: 'uid=eq.${userProfile.id}',
  74. ),
  75. );
  76. const ops = RealtimeChannelConfig(ack: true);
  77. channel?.unsubscribe();
  78. channel = supabase.client.channel("table-db-changes", opts: ops);
  79. for (final filter in filters) {
  80. channel?.on(
  81. RealtimeListenTypes.postgresChanges,
  82. filter,
  83. (payload, [ref]) {
  84. try {
  85. final jsonStr = jsonEncode(payload);
  86. final pb = RealtimePayloadPB.create()..jsonStr = jsonStr;
  87. UserEventPushRealtimeEvent(pb).send();
  88. } catch (e) {
  89. Log.error(e);
  90. }
  91. },
  92. );
  93. }
  94. channel?.subscribe(
  95. (status, [err]) {
  96. Log.info(
  97. "subscribe channel statue: $status, err: $err",
  98. );
  99. },
  100. );
  101. });
  102. }
  103. Future<void> dispose() async {
  104. await _authStateListener.stop();
  105. await authStateSubscription?.cancel();
  106. await channel?.unsubscribe();
  107. }
  108. }