system.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. use crate::module::{as_module_map, Module, ModuleMap};
  2. use futures_core::{ready, task::Context};
  3. use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc};
  4. use tokio::{
  5. macros::support::{Pin, Poll},
  6. sync::{
  7. mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
  8. oneshot,
  9. },
  10. };
  11. thread_local!(
  12. static CURRENT: RefCell<Option<Arc<FlowySystem>>> = RefCell::new(None);
  13. );
  14. #[derive(Debug)]
  15. #[allow(dead_code)]
  16. pub enum SystemCommand {
  17. Exit(i8),
  18. }
  19. pub struct FlowySystem {
  20. sys_cmd_tx: UnboundedSender<SystemCommand>,
  21. }
  22. impl FlowySystem {
  23. #[allow(dead_code)]
  24. pub fn construct<F, S>(module_factory: F, sender_factory: S) -> SystemRunner
  25. where
  26. F: FnOnce() -> Vec<Module>,
  27. S: FnOnce(ModuleMap, &Runtime),
  28. {
  29. let runtime = Arc::new(Runtime::new().unwrap());
  30. let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
  31. let (stop_tx, stop_rx) = oneshot::channel();
  32. runtime.spawn(SystemController {
  33. stop_tx: Some(stop_tx),
  34. sys_cmd_rx,
  35. });
  36. let module_map = as_module_map(module_factory());
  37. sender_factory(module_map, &runtime);
  38. let system = Self { sys_cmd_tx };
  39. FlowySystem::set_current(system);
  40. SystemRunner { rt: runtime, stop_rx }
  41. }
  42. #[allow(dead_code)]
  43. pub fn stop(&self) {
  44. match self.sys_cmd_tx.send(SystemCommand::Exit(0)) {
  45. Ok(_) => {},
  46. Err(e) => {
  47. log::error!("Stop system error: {}", e);
  48. },
  49. }
  50. }
  51. #[allow(dead_code)]
  52. pub fn set_current(sys: FlowySystem) {
  53. CURRENT.with(|cell| {
  54. *cell.borrow_mut() = Some(Arc::new(sys));
  55. })
  56. }
  57. #[allow(dead_code)]
  58. pub fn current() -> Arc<FlowySystem> {
  59. CURRENT.with(|cell| match *cell.borrow() {
  60. Some(ref sys) => sys.clone(),
  61. None => panic!("System is not running"),
  62. })
  63. }
  64. }
  65. struct SystemController {
  66. stop_tx: Option<oneshot::Sender<i8>>,
  67. sys_cmd_rx: UnboundedReceiver<SystemCommand>,
  68. }
  69. impl Future for SystemController {
  70. type Output = ();
  71. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  72. loop {
  73. match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) {
  74. None => return Poll::Ready(()),
  75. Some(cmd) => match cmd {
  76. SystemCommand::Exit(code) => {
  77. if let Some(tx) = self.stop_tx.take() {
  78. let _ = tx.send(code);
  79. }
  80. },
  81. },
  82. }
  83. }
  84. }
  85. }
  86. pub struct SystemRunner {
  87. rt: Arc<Runtime>,
  88. stop_rx: oneshot::Receiver<i8>,
  89. }
  90. impl SystemRunner {
  91. #[allow(dead_code)]
  92. pub fn run(self) -> io::Result<()> {
  93. let SystemRunner { rt, stop_rx } = self;
  94. match rt.block_on(stop_rx) {
  95. Ok(code) => {
  96. if code != 0 {
  97. Err(io::Error::new(
  98. io::ErrorKind::Other,
  99. format!("Non-zero exit code: {}", code),
  100. ))
  101. } else {
  102. Ok(())
  103. }
  104. },
  105. Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
  106. }
  107. }
  108. #[allow(dead_code)]
  109. pub fn spawn<F: Future<Output = ()> + 'static>(self, future: F) -> Self {
  110. self.rt.spawn(future);
  111. self
  112. }
  113. }
  114. use crate::util::tokio_default_runtime;
  115. use tokio::{runtime, task::LocalSet};
  116. #[derive(Debug)]
  117. pub struct Runtime {
  118. local: LocalSet,
  119. rt: runtime::Runtime,
  120. }
  121. impl Runtime {
  122. #[allow(dead_code)]
  123. pub fn new() -> io::Result<Runtime> {
  124. let rt = tokio_default_runtime()?;
  125. Ok(Runtime {
  126. rt,
  127. local: LocalSet::new(),
  128. })
  129. }
  130. #[allow(dead_code)]
  131. pub fn spawn<F>(&self, future: F) -> &Self
  132. where
  133. F: Future<Output = ()> + 'static,
  134. {
  135. self.local.spawn_local(future);
  136. self
  137. }
  138. #[allow(dead_code)]
  139. pub fn block_on<F>(&self, f: F) -> F::Output
  140. where
  141. F: Future + 'static,
  142. {
  143. self.local.block_on(&self.rt, f)
  144. }
  145. }