system.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. use crate::module::{as_module_map, Module, ModuleMap};
  2. use futures_core::{ready, task::Context};
  3. use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc};
  4. use tokio::{
  5. macros::support::{Pin, Poll},
  6. sync::{
  7. mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
  8. oneshot,
  9. },
  10. };
  11. thread_local!(
  12. static CURRENT: RefCell<Option<Arc<FlowySystem>>> = RefCell::new(None);
  13. );
  14. #[derive(Debug)]
  15. #[allow(dead_code)]
  16. pub enum SystemCommand {
  17. Exit(i8),
  18. }
  19. pub struct FlowySystem {
  20. sys_cmd_tx: UnboundedSender<SystemCommand>,
  21. }
  22. impl FlowySystem {
  23. #[allow(dead_code)]
  24. pub fn construct<F, S>(module_factory: F, sender_factory: S) -> SystemRunner
  25. where
  26. F: FnOnce() -> Vec<Module>,
  27. S: FnOnce(ModuleMap, &Runtime),
  28. {
  29. let runtime = Arc::new(Runtime::new().unwrap());
  30. let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
  31. let (stop_tx, stop_rx) = oneshot::channel();
  32. runtime.spawn(SystemController {
  33. stop_tx: Some(stop_tx),
  34. sys_cmd_rx,
  35. });
  36. let module_map = as_module_map(module_factory());
  37. sender_factory(module_map.clone(), &runtime);
  38. let system = Self { sys_cmd_tx };
  39. FlowySystem::set_current(system);
  40. let runner = SystemRunner { rt: runtime, stop_rx };
  41. runner
  42. }
  43. #[allow(dead_code)]
  44. pub fn stop(&self) {
  45. match self.sys_cmd_tx.send(SystemCommand::Exit(0)) {
  46. Ok(_) => {},
  47. Err(e) => {
  48. log::error!("Stop system error: {}", e);
  49. },
  50. }
  51. }
  52. #[allow(dead_code)]
  53. pub fn set_current(sys: FlowySystem) {
  54. CURRENT.with(|cell| {
  55. *cell.borrow_mut() = Some(Arc::new(sys));
  56. })
  57. }
  58. #[allow(dead_code)]
  59. pub fn current() -> Arc<FlowySystem> {
  60. CURRENT.with(|cell| match *cell.borrow() {
  61. Some(ref sys) => sys.clone(),
  62. None => panic!("System is not running"),
  63. })
  64. }
  65. }
  66. struct SystemController {
  67. stop_tx: Option<oneshot::Sender<i8>>,
  68. sys_cmd_rx: UnboundedReceiver<SystemCommand>,
  69. }
  70. impl Future for SystemController {
  71. type Output = ();
  72. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  73. loop {
  74. match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) {
  75. None => return Poll::Ready(()),
  76. Some(cmd) => match cmd {
  77. SystemCommand::Exit(code) => {
  78. if let Some(tx) = self.stop_tx.take() {
  79. let _ = tx.send(code);
  80. }
  81. },
  82. },
  83. }
  84. }
  85. }
  86. }
  87. pub struct SystemRunner {
  88. rt: Arc<Runtime>,
  89. stop_rx: oneshot::Receiver<i8>,
  90. }
  91. impl SystemRunner {
  92. #[allow(dead_code)]
  93. pub fn run(self) -> io::Result<()> {
  94. let SystemRunner { rt, stop_rx } = self;
  95. match rt.block_on(stop_rx) {
  96. Ok(code) => {
  97. if code != 0 {
  98. Err(io::Error::new(
  99. io::ErrorKind::Other,
  100. format!("Non-zero exit code: {}", code),
  101. ))
  102. } else {
  103. Ok(())
  104. }
  105. },
  106. Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
  107. }
  108. }
  109. #[allow(dead_code)]
  110. pub fn spawn<F: Future<Output = ()> + 'static>(self, future: F) -> Self {
  111. self.rt.spawn(future);
  112. self
  113. }
  114. }
  115. use crate::util::tokio_default_runtime;
  116. use tokio::{runtime, task::LocalSet};
  117. #[derive(Debug)]
  118. pub struct Runtime {
  119. local: LocalSet,
  120. rt: runtime::Runtime,
  121. }
  122. impl Runtime {
  123. #[allow(dead_code)]
  124. pub fn new() -> io::Result<Runtime> {
  125. let rt = tokio_default_runtime()?;
  126. Ok(Runtime {
  127. rt,
  128. local: LocalSet::new(),
  129. })
  130. }
  131. #[allow(dead_code)]
  132. pub fn spawn<F>(&self, future: F) -> &Self
  133. where
  134. F: Future<Output = ()> + 'static,
  135. {
  136. self.local.spawn_local(future);
  137. self
  138. }
  139. #[allow(dead_code)]
  140. pub fn block_on<F>(&self, f: F) -> F::Output
  141. where
  142. F: Future + 'static,
  143. {
  144. self.local.block_on(&self.rt, f)
  145. }
  146. }