lib.rs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. pub mod entities;
  2. mod protobuf;
  3. use crate::entities::SubscribeObject;
  4. use bytes::Bytes;
  5. use lazy_static::lazy_static;
  6. use lib_dispatch::prelude::ToBytes;
  7. use std::sync::RwLock;
  8. lazy_static! {
  9. static ref NOTIFICATION_SENDER: RwLock<Vec<Box<dyn NotificationSender>>> = RwLock::new(vec![]);
  10. }
  11. pub fn register_notification_sender<T: NotificationSender>(sender: T) {
  12. let box_sender = Box::new(sender);
  13. match NOTIFICATION_SENDER.write() {
  14. Ok(mut write_guard) => write_guard.push(box_sender),
  15. Err(err) => tracing::error!("Failed to push notification sender: {:?}", err),
  16. }
  17. }
  18. pub trait NotificationSender: Send + Sync + 'static {
  19. fn send_subject(&self, subject: SubscribeObject) -> Result<(), String>;
  20. }
  21. pub struct NotificationBuilder {
  22. id: String,
  23. payload: Option<Bytes>,
  24. error: Option<Bytes>,
  25. source: String,
  26. ty: i32,
  27. }
  28. impl NotificationBuilder {
  29. pub fn new<T: Into<i32>>(id: &str, ty: T, source: &str) -> Self {
  30. Self {
  31. id: id.to_owned(),
  32. ty: ty.into(),
  33. payload: None,
  34. error: None,
  35. source: source.to_owned(),
  36. }
  37. }
  38. pub fn payload<T>(mut self, payload: T) -> Self
  39. where
  40. T: ToBytes,
  41. {
  42. match payload.into_bytes() {
  43. Ok(bytes) => self.payload = Some(bytes),
  44. Err(e) => {
  45. tracing::error!("Set observable payload failed: {:?}", e);
  46. },
  47. }
  48. self
  49. }
  50. pub fn error<T>(mut self, error: T) -> Self
  51. where
  52. T: ToBytes,
  53. {
  54. match error.into_bytes() {
  55. Ok(bytes) => self.error = Some(bytes),
  56. Err(e) => {
  57. tracing::error!("Set observable error failed: {:?}", e);
  58. },
  59. }
  60. self
  61. }
  62. pub fn send(self) {
  63. let payload = self.payload.map(|bytes| bytes.to_vec());
  64. let error = self.error.map(|bytes| bytes.to_vec());
  65. let subject = SubscribeObject {
  66. source: self.source,
  67. ty: self.ty,
  68. id: self.id,
  69. payload,
  70. error,
  71. };
  72. match NOTIFICATION_SENDER.read() {
  73. Ok(read_guard) => read_guard.iter().for_each(|sender| {
  74. if let Err(e) = sender.send_subject(subject.clone()) {
  75. tracing::error!("Post notification failed: {}", e);
  76. }
  77. }),
  78. Err(err) => {
  79. tracing::error!("Read notification sender failed: {}", err);
  80. },
  81. }
  82. }
  83. }