system.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use crate::module::{as_module_map, Module, ModuleMap};
  2. use futures_core::{ready, task::Context};
  3. use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc};
  4. use tokio::{
  5. macros::support::{Pin, Poll},
  6. sync::{
  7. mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
  8. oneshot,
  9. },
  10. };
  11. thread_local!(
  12. static CURRENT: RefCell<Option<Arc<FlowySystem>>> = RefCell::new(None);
  13. );
  14. #[derive(Debug)]
  15. #[allow(dead_code)]
  16. pub enum SystemCommand {
  17. Exit(i8),
  18. }
  19. pub struct FlowySystem {
  20. sys_cmd_tx: UnboundedSender<SystemCommand>,
  21. }
  22. impl FlowySystem {
  23. #[allow(dead_code)]
  24. pub fn construct<F, S>(module_factory: F, sender_factory: S) -> SystemRunner
  25. where
  26. F: FnOnce() -> Vec<Module>,
  27. S: FnOnce(ModuleMap, &Runtime),
  28. {
  29. let runtime = Arc::new(Runtime::new().unwrap());
  30. let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
  31. let (stop_tx, stop_rx) = oneshot::channel();
  32. runtime.spawn(SystemController {
  33. stop_tx: Some(stop_tx),
  34. sys_cmd_rx,
  35. });
  36. let module_map = as_module_map(module_factory());
  37. sender_factory(module_map.clone(), &runtime);
  38. let system = Self { sys_cmd_tx };
  39. FlowySystem::set_current(system);
  40. let runner = SystemRunner {
  41. rt: runtime,
  42. stop_rx,
  43. };
  44. runner
  45. }
  46. #[allow(dead_code)]
  47. pub fn stop(&self) {
  48. match self.sys_cmd_tx.send(SystemCommand::Exit(0)) {
  49. Ok(_) => {},
  50. Err(e) => {
  51. log::error!("Stop system error: {}", e);
  52. },
  53. }
  54. }
  55. #[allow(dead_code)]
  56. pub fn set_current(sys: FlowySystem) {
  57. CURRENT.with(|cell| {
  58. *cell.borrow_mut() = Some(Arc::new(sys));
  59. })
  60. }
  61. #[allow(dead_code)]
  62. pub fn current() -> Arc<FlowySystem> {
  63. CURRENT.with(|cell| match *cell.borrow() {
  64. Some(ref sys) => sys.clone(),
  65. None => panic!("System is not running"),
  66. })
  67. }
  68. }
  69. struct SystemController {
  70. stop_tx: Option<oneshot::Sender<i8>>,
  71. sys_cmd_rx: UnboundedReceiver<SystemCommand>,
  72. }
  73. impl Future for SystemController {
  74. type Output = ();
  75. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  76. loop {
  77. match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) {
  78. None => return Poll::Ready(()),
  79. Some(cmd) => match cmd {
  80. SystemCommand::Exit(code) => {
  81. if let Some(tx) = self.stop_tx.take() {
  82. let _ = tx.send(code);
  83. }
  84. },
  85. },
  86. }
  87. }
  88. }
  89. }
  90. pub struct SystemRunner {
  91. rt: Arc<Runtime>,
  92. stop_rx: oneshot::Receiver<i8>,
  93. }
  94. impl SystemRunner {
  95. #[allow(dead_code)]
  96. pub fn run(self) -> io::Result<()> {
  97. let SystemRunner { rt, stop_rx } = self;
  98. match rt.block_on(stop_rx) {
  99. Ok(code) => {
  100. if code != 0 {
  101. Err(io::Error::new(
  102. io::ErrorKind::Other,
  103. format!("Non-zero exit code: {}", code),
  104. ))
  105. } else {
  106. Ok(())
  107. }
  108. },
  109. Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
  110. }
  111. }
  112. #[allow(dead_code)]
  113. pub fn spawn<F: Future<Output = ()> + 'static>(self, future: F) -> Self {
  114. self.rt.spawn(future);
  115. self
  116. }
  117. }
  118. use crate::util::tokio_default_runtime;
  119. use tokio::{runtime, task::LocalSet};
  120. #[derive(Debug)]
  121. pub struct Runtime {
  122. local: LocalSet,
  123. rt: runtime::Runtime,
  124. }
  125. impl Runtime {
  126. #[allow(dead_code)]
  127. pub fn new() -> io::Result<Runtime> {
  128. let rt = tokio_default_runtime()?;
  129. Ok(Runtime {
  130. rt,
  131. local: LocalSet::new(),
  132. })
  133. }
  134. #[allow(dead_code)]
  135. pub fn spawn<F>(&self, future: F) -> &Self
  136. where
  137. F: Future<Output = ()> + 'static,
  138. {
  139. self.local.spawn_local(future);
  140. self
  141. }
  142. #[allow(dead_code)]
  143. pub fn block_on<F>(&self, f: F) -> F::Output
  144. where
  145. F: Future + 'static,
  146. {
  147. self.local.block_on(&self.rt, f)
  148. }
  149. }