| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 | use crate::runtime::AFPluginRuntime;use crate::{  errors::{DispatchError, Error, InternalError},  module::{as_plugin_map, AFPlugin, AFPluginMap, AFPluginRequest},  response::AFPluginEventResponse,  service::{AFPluginServiceFactory, Service},};use derivative::*;use futures_core::future::BoxFuture;use futures_util::task::Context;use pin_project::pin_project;use std::{future::Future, sync::Arc};use tokio::macros::support::{Pin, Poll};pub struct AFPluginDispatcher {  plugins: AFPluginMap,  runtime: AFPluginRuntime,}impl AFPluginDispatcher {  pub fn construct<F>(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher  where    F: FnOnce() -> Vec<AFPlugin>,  {    let plugins = module_factory();    tracing::trace!("{}", plugin_info(&plugins));    AFPluginDispatcher {      plugins: as_plugin_map(plugins),      runtime,    }  }  pub fn async_send<Req>(    dispatch: Arc<AFPluginDispatcher>,    request: Req,  ) -> DispatchFuture<AFPluginEventResponse>  where    Req: std::convert::Into<AFPluginRequest>,  {    AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))  }  pub fn async_send_with_callback<Req, Callback>(    dispatch: Arc<AFPluginDispatcher>,    request: Req,    callback: Callback,  ) -> DispatchFuture<AFPluginEventResponse>  where    Req: std::convert::Into<AFPluginRequest>,    Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,  {    let request: AFPluginRequest = request.into();    let plugins = dispatch.plugins.clone();    let service = Box::new(DispatchService { plugins });    tracing::trace!("Async event: {:?}", &request.event);    let service_ctx = DispatchContext {      request,      callback: Some(Box::new(callback)),    };    let join_handle = dispatch.runtime.spawn(async move {      service.call(service_ctx).await.unwrap_or_else(|e| {        tracing::error!("Dispatch runtime error: {:?}", e);        InternalError::Other(format!("{:?}", e)).as_response()      })    });    DispatchFuture {      fut: Box::pin(async move {        join_handle.await.unwrap_or_else(|e| {          let msg = format!("EVENT_DISPATCH join error: {:?}", e);          tracing::error!("{}", msg);          let error = InternalError::JoinError(msg);          error.as_response()        })      }),    }  }  pub fn sync_send(    dispatch: Arc<AFPluginDispatcher>,    request: AFPluginRequest,  ) -> AFPluginEventResponse {    futures::executor::block_on(async {      AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await    })  }  pub fn spawn<F>(&self, f: F)  where    F: Future<Output = ()> + Send + 'static,  {    self.runtime.spawn(f);  }}#[pin_project]pub struct DispatchFuture<T: Send + Sync> {  #[pin]  pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,}impl<T> Future for DispatchFuture<T>where  T: Send + Sync,{  type Output = T;  fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {    let this = self.as_mut().project();    Poll::Ready(futures_core::ready!(this.fut.poll(cx)))  }}pub type BoxFutureCallback =  Box<dyn FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;#[derive(Derivative)]#[derivative(Debug)]pub struct DispatchContext {  pub request: AFPluginRequest,  #[derivative(Debug = "ignore")]  pub callback: Option<BoxFutureCallback>,}impl DispatchContext {  pub(crate) fn into_parts(self) -> (AFPluginRequest, Option<BoxFutureCallback>) {    let DispatchContext { request, callback } = self;    (request, callback)  }}pub(crate) struct DispatchService {  pub(crate) plugins: AFPluginMap,}impl Service<DispatchContext> for DispatchService {  type Response = AFPluginEventResponse;  type Error = DispatchError;  type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;  #[cfg_attr(    feature = "use_tracing",    tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))  )]  fn call(&self, ctx: DispatchContext) -> Self::Future {    let module_map = self.plugins.clone();    let (request, callback) = ctx.into_parts();    Box::pin(async move {      let result = {        // print_module_map_info(&module_map);        match module_map.get(&request.event) {          Some(module) => {            tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);            let fut = module.new_service(());            let service_fut = fut.await?.call(request);            service_fut.await          },          None => {            let msg = format!("Can not find the event handler. {:?}", request);            tracing::error!("{}", msg);            Err(InternalError::HandleNotFound(msg).into())          },        }      };      let response = result.unwrap_or_else(|e| e.into());      tracing::trace!("Dispatch result: {:?}", response);      if let Some(callback) = callback {        callback(response.clone()).await;      }      Ok(response)    })  }}#[allow(dead_code)]fn plugin_info(plugins: &[AFPlugin]) -> String {  let mut info = format!("{} plugins loaded\n", plugins.len());  for module in plugins {    info.push_str(&format!("-> {} loaded \n", module.name));  }  info}#[allow(dead_code)]fn print_plugins(plugins: &AFPluginMap) {  plugins.iter().for_each(|(k, v)| {    tracing::info!("Event: {:?} plugin : {:?}", k, v.name);  })}
 |