pool.rs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. use crate::{errors::*, pragma::*};
  2. use diesel::{connection::Connection, SqliteConnection};
  3. use r2d2::{CustomizeConnection, ManageConnection, Pool};
  4. use scheduled_thread_pool::ScheduledThreadPool;
  5. use std::{sync::Arc, time::Duration};
  6. lazy_static::lazy_static! {
  7. static ref DB_POOL: Arc<ScheduledThreadPool> = Arc::new(
  8. ScheduledThreadPool::with_name("db-pool-{}:", 4)
  9. );
  10. }
  11. pub struct ConnectionPool {
  12. pub(crate) inner: Pool<ConnectionManager>,
  13. }
  14. impl std::ops::Deref for ConnectionPool {
  15. type Target = Pool<ConnectionManager>;
  16. fn deref(&self) -> &Self::Target { &self.inner }
  17. }
  18. impl ConnectionPool {
  19. pub fn new<T>(config: PoolConfig, uri: T) -> Result<Self>
  20. where
  21. T: Into<String>,
  22. {
  23. let manager = ConnectionManager::new(uri);
  24. let thread_pool = DB_POOL.clone();
  25. let config = Arc::new(config);
  26. let customizer_config = DatabaseCustomizerConfig::default();
  27. let pool = r2d2::Pool::builder()
  28. .thread_pool(thread_pool)
  29. .min_idle(Some(config.min_idle))
  30. .connection_customizer(Box::new(DatabaseCustomizer::new(customizer_config)))
  31. .max_size(config.max_size)
  32. .max_lifetime(None)
  33. .connection_timeout(config.connection_timeout)
  34. .idle_timeout(Some(config.idle_timeout))
  35. .build_unchecked(manager);
  36. Ok(ConnectionPool { inner: pool })
  37. }
  38. }
  39. pub type OnExecFunc = Box<dyn Fn() -> Box<dyn Fn(&SqliteConnection, &str)> + Send + Sync>;
  40. pub struct PoolConfig {
  41. min_idle: u32,
  42. max_size: u32,
  43. connection_timeout: Duration,
  44. idle_timeout: Duration,
  45. }
  46. impl Default for PoolConfig {
  47. fn default() -> Self {
  48. Self {
  49. min_idle: 1,
  50. max_size: 10,
  51. connection_timeout: Duration::from_secs(10),
  52. idle_timeout: Duration::from_secs(5 * 60),
  53. }
  54. }
  55. }
  56. impl PoolConfig {
  57. #[allow(dead_code)]
  58. pub fn min_idle(mut self, min_idle: u32) -> Self {
  59. self.min_idle = min_idle;
  60. self
  61. }
  62. #[allow(dead_code)]
  63. pub fn max_size(mut self, max_size: u32) -> Self {
  64. self.max_size = max_size;
  65. self
  66. }
  67. }
  68. pub struct ConnectionManager {
  69. db_uri: String,
  70. }
  71. impl ManageConnection for ConnectionManager {
  72. type Connection = SqliteConnection;
  73. type Error = crate::Error;
  74. fn connect(&self) -> Result<Self::Connection> { Ok(SqliteConnection::establish(&self.db_uri)?) }
  75. fn is_valid(&self, conn: &mut Self::Connection) -> Result<()> { Ok(conn.execute("SELECT 1").map(|_| ())?) }
  76. fn has_broken(&self, _conn: &mut Self::Connection) -> bool { false }
  77. }
  78. impl ConnectionManager {
  79. pub fn new<S: Into<String>>(uri: S) -> Self { ConnectionManager { db_uri: uri.into() } }
  80. }
  81. #[derive(Debug)]
  82. pub struct DatabaseCustomizerConfig {
  83. pub(crate) journal_mode: SQLiteJournalMode,
  84. pub(crate) synchronous: SQLiteSynchronous,
  85. pub(crate) busy_timeout: i32,
  86. #[allow(dead_code)]
  87. pub(crate) secure_delete: bool,
  88. }
  89. impl Default for DatabaseCustomizerConfig {
  90. fn default() -> Self {
  91. Self {
  92. journal_mode: SQLiteJournalMode::WAL,
  93. synchronous: SQLiteSynchronous::NORMAL,
  94. busy_timeout: 5000,
  95. secure_delete: true,
  96. }
  97. }
  98. }
  99. #[derive(Debug)]
  100. struct DatabaseCustomizer {
  101. config: DatabaseCustomizerConfig,
  102. }
  103. impl DatabaseCustomizer {
  104. fn new(config: DatabaseCustomizerConfig) -> Self
  105. where
  106. Self: Sized,
  107. {
  108. Self { config }
  109. }
  110. }
  111. impl CustomizeConnection<SqliteConnection, crate::Error> for DatabaseCustomizer {
  112. fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<()> {
  113. conn.pragma_set_busy_timeout(self.config.busy_timeout)?;
  114. if self.config.journal_mode != SQLiteJournalMode::WAL {
  115. conn.pragma_set_journal_mode(self.config.journal_mode, None)?;
  116. }
  117. conn.pragma_set_synchronous(self.config.synchronous, None)?;
  118. Ok(())
  119. }
  120. }