pool.rs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. use crate::{errors::*, pragma::*};
  2. use diesel::{connection::Connection, SqliteConnection};
  3. use r2d2::{CustomizeConnection, ManageConnection, Pool};
  4. use scheduled_thread_pool::ScheduledThreadPool;
  5. use std::{sync::Arc, time::Duration};
  6. lazy_static::lazy_static! {
  7. static ref DB_POOL: Arc<ScheduledThreadPool> = Arc::new(
  8. ScheduledThreadPool::with_name("db-pool-{}:", 4)
  9. );
  10. }
  11. pub struct ConnectionPool {
  12. pub(crate) inner: Pool<ConnectionManager>,
  13. }
  14. impl std::ops::Deref for ConnectionPool {
  15. type Target = Pool<ConnectionManager>;
  16. fn deref(&self) -> &Self::Target {
  17. &self.inner
  18. }
  19. }
  20. impl ConnectionPool {
  21. pub fn new<T>(config: PoolConfig, uri: T) -> Result<Self>
  22. where
  23. T: Into<String>,
  24. {
  25. let manager = ConnectionManager::new(uri);
  26. let thread_pool = DB_POOL.clone();
  27. let config = Arc::new(config);
  28. let customizer_config = DatabaseCustomizerConfig::default();
  29. let pool = r2d2::Pool::builder()
  30. .thread_pool(thread_pool)
  31. .min_idle(Some(config.min_idle))
  32. .connection_customizer(Box::new(DatabaseCustomizer::new(customizer_config)))
  33. .max_size(config.max_size)
  34. .max_lifetime(None)
  35. .connection_timeout(config.connection_timeout)
  36. .idle_timeout(Some(config.idle_timeout))
  37. .build_unchecked(manager);
  38. Ok(ConnectionPool { inner: pool })
  39. }
  40. }
  41. pub type OnExecFunc = Box<dyn Fn() -> Box<dyn Fn(&SqliteConnection, &str)> + Send + Sync>;
  42. pub struct PoolConfig {
  43. min_idle: u32,
  44. max_size: u32,
  45. connection_timeout: Duration,
  46. idle_timeout: Duration,
  47. }
  48. impl Default for PoolConfig {
  49. fn default() -> Self {
  50. Self {
  51. min_idle: 1,
  52. max_size: 10,
  53. connection_timeout: Duration::from_secs(10),
  54. idle_timeout: Duration::from_secs(5 * 60),
  55. }
  56. }
  57. }
  58. impl PoolConfig {
  59. #[allow(dead_code)]
  60. pub fn min_idle(mut self, min_idle: u32) -> Self {
  61. self.min_idle = min_idle;
  62. self
  63. }
  64. #[allow(dead_code)]
  65. pub fn max_size(mut self, max_size: u32) -> Self {
  66. self.max_size = max_size;
  67. self
  68. }
  69. }
  70. pub struct ConnectionManager {
  71. db_uri: String,
  72. }
  73. impl ManageConnection for ConnectionManager {
  74. type Connection = SqliteConnection;
  75. type Error = crate::Error;
  76. fn connect(&self) -> Result<Self::Connection> {
  77. Ok(SqliteConnection::establish(&self.db_uri)?)
  78. }
  79. fn is_valid(&self, conn: &mut Self::Connection) -> Result<()> {
  80. Ok(conn.execute("SELECT 1").map(|_| ())?)
  81. }
  82. fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
  83. false
  84. }
  85. }
  86. impl ConnectionManager {
  87. pub fn new<S: Into<String>>(uri: S) -> Self {
  88. ConnectionManager { db_uri: uri.into() }
  89. }
  90. }
  91. #[derive(Debug)]
  92. pub struct DatabaseCustomizerConfig {
  93. pub(crate) journal_mode: SQLiteJournalMode,
  94. pub(crate) synchronous: SQLiteSynchronous,
  95. pub(crate) busy_timeout: i32,
  96. #[allow(dead_code)]
  97. pub(crate) secure_delete: bool,
  98. }
  99. impl Default for DatabaseCustomizerConfig {
  100. fn default() -> Self {
  101. Self {
  102. journal_mode: SQLiteJournalMode::WAL,
  103. synchronous: SQLiteSynchronous::NORMAL,
  104. busy_timeout: 5000,
  105. secure_delete: true,
  106. }
  107. }
  108. }
  109. #[derive(Debug)]
  110. struct DatabaseCustomizer {
  111. config: DatabaseCustomizerConfig,
  112. }
  113. impl DatabaseCustomizer {
  114. fn new(config: DatabaseCustomizerConfig) -> Self
  115. where
  116. Self: Sized,
  117. {
  118. Self { config }
  119. }
  120. }
  121. impl CustomizeConnection<SqliteConnection, crate::Error> for DatabaseCustomizer {
  122. fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<()> {
  123. conn.pragma_set_busy_timeout(self.config.busy_timeout)?;
  124. if self.config.journal_mode != SQLiteJournalMode::WAL {
  125. conn.pragma_set_journal_mode(self.config.journal_mode, None)?;
  126. }
  127. conn.pragma_set_synchronous(self.config.synchronous, None)?;
  128. Ok(())
  129. }
  130. }