lib.rs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. pub mod entities;
  2. mod protobuf;
  3. use crate::entities::SubscribeObject;
  4. use bytes::Bytes;
  5. use lazy_static::lazy_static;
  6. use lib_dispatch::prelude::ToBytes;
  7. use std::sync::RwLock;
  8. lazy_static! {
  9. static ref NOTIFICATION_SENDER: RwLock<Vec<Box<dyn NotificationSender>>> = RwLock::new(vec![]);
  10. }
  11. pub fn register_notification_sender<T: NotificationSender>(sender: T) {
  12. let box_sender = Box::new(sender);
  13. match NOTIFICATION_SENDER.write() {
  14. Ok(mut write_guard) => {
  15. write_guard.pop();
  16. write_guard.push(box_sender)
  17. },
  18. Err(err) => tracing::error!("Failed to push notification sender: {:?}", err),
  19. }
  20. }
  21. pub trait NotificationSender: Send + Sync + 'static {
  22. fn send_subject(&self, subject: SubscribeObject) -> Result<(), String>;
  23. }
  24. pub struct NotificationBuilder {
  25. id: String,
  26. payload: Option<Bytes>,
  27. error: Option<Bytes>,
  28. source: String,
  29. ty: i32,
  30. }
  31. impl NotificationBuilder {
  32. pub fn new<T: Into<i32>>(id: &str, ty: T, source: &str) -> Self {
  33. Self {
  34. id: id.to_owned(),
  35. ty: ty.into(),
  36. payload: None,
  37. error: None,
  38. source: source.to_owned(),
  39. }
  40. }
  41. pub fn payload<T>(mut self, payload: T) -> Self
  42. where
  43. T: ToBytes,
  44. {
  45. match payload.into_bytes() {
  46. Ok(bytes) => self.payload = Some(bytes),
  47. Err(e) => {
  48. tracing::error!("Set observable payload failed: {:?}", e);
  49. },
  50. }
  51. self
  52. }
  53. pub fn error<T>(mut self, error: T) -> Self
  54. where
  55. T: ToBytes,
  56. {
  57. match error.into_bytes() {
  58. Ok(bytes) => self.error = Some(bytes),
  59. Err(e) => {
  60. tracing::error!("Set observable error failed: {:?}", e);
  61. },
  62. }
  63. self
  64. }
  65. pub fn send(self) {
  66. let payload = self.payload.map(|bytes| bytes.to_vec());
  67. let error = self.error.map(|bytes| bytes.to_vec());
  68. let subject = SubscribeObject {
  69. source: self.source,
  70. ty: self.ty,
  71. id: self.id,
  72. payload,
  73. error,
  74. };
  75. match NOTIFICATION_SENDER.read() {
  76. Ok(read_guard) => read_guard.iter().for_each(|sender| {
  77. if let Err(e) = sender.send_subject(subject.clone()) {
  78. tracing::error!("Post notification failed: {}", e);
  79. }
  80. }),
  81. Err(err) => {
  82. tracing::error!("Read notification sender failed: {}", err);
  83. },
  84. }
  85. }
  86. }