1- use std:: path:: { Path , PathBuf } ;
1+ use std:: path:: Path ;
22use std:: sync:: Weak ;
33use std:: sync:: { atomic:: AtomicBool , Arc } ;
44use std:: time:: Duration ;
@@ -50,7 +50,11 @@ pub(super) async fn make_primary_connection_maker(
5050 broadcaster : BroadcasterHandle ,
5151 make_wal_manager : Arc < dyn Fn ( ) -> InnerWalManager + Sync + Send + ' static > ,
5252 encryption_config : Option < EncryptionConfig > ,
53- ) -> crate :: Result < ( PrimaryConnectionMaker , ReplicationWalWrapper , Arc < Stats > ) > {
53+ ) -> crate :: Result < (
54+ Arc < PrimaryConnectionMaker > ,
55+ ReplicationWalWrapper ,
56+ Arc < Stats > ,
57+ ) > {
5458 let db_config = meta_store_handle. get ( ) ;
5559 let bottomless_db_id = NamespaceBottomlessDbId :: from_config ( & db_config) ;
5660 // FIXME: figure how to to it per-db
@@ -115,7 +119,6 @@ pub(super) async fn make_primary_connection_maker(
115119 base_config. stats_sender . clone ( ) ,
116120 name. clone ( ) ,
117121 logger. new_frame_notifier . subscribe ( ) ,
118- base_config. encryption_config . clone ( ) ,
119122 )
120123 . await ?;
121124
@@ -152,6 +155,11 @@ pub(super) async fn make_primary_connection_maker(
152155
153156 tracing:: debug!( "Completed opening libsql connection" ) ;
154157
158+ join_set. spawn ( run_storage_monitor (
159+ Arc :: downgrade ( & stats) ,
160+ connection_maker. clone ( ) ,
161+ ) ) ;
162+
155163 // this must happen after we create the connection maker. The connection maker old on a
156164 // connection to ensure that no other connection is closing while we try to open the dump.
157165 // that would cause a SQLITE_LOCKED error.
@@ -336,7 +344,6 @@ pub(super) async fn make_stats(
336344 stats_sender : StatsSender ,
337345 name : NamespaceName ,
338346 mut current_frame_no : watch:: Receiver < Option < FrameNo > > ,
339- encryption_config : Option < EncryptionConfig > ,
340347) -> anyhow:: Result < Arc < Stats > > {
341348 tracing:: debug!( "creating stats type" ) ;
342349 let stats = Stats :: new ( name. clone ( ) , db_path, join_set) . await ?;
@@ -363,12 +370,6 @@ pub(super) async fn make_stats(
363370 }
364371 } ) ;
365372
366- join_set. spawn ( run_storage_monitor (
367- db_path. into ( ) ,
368- Arc :: downgrade ( & stats) ,
369- encryption_config,
370- ) ) ;
371-
372373 tracing:: debug!( "done sending stats, and creating bg tasks" ) ;
373374
374375 Ok ( stats)
@@ -377,44 +378,48 @@ pub(super) async fn make_stats(
377378// Periodically check the storage used by the database and save it in the Stats structure.
378379// TODO: Once we have a separate fiber that does WAL checkpoints, running this routine
379380// right after checkpointing is exactly where it should be done.
380- async fn run_storage_monitor (
381- db_path : PathBuf ,
381+ pub ( crate ) async fn run_storage_monitor < M : MakeConnection > (
382382 stats : Weak < Stats > ,
383- encryption_config : Option < EncryptionConfig > ,
383+ connection_maker : Arc < M > ,
384384) -> anyhow:: Result < ( ) > {
385385 // on initialization, the database file doesn't exist yet, so we wait a bit for it to be
386386 // created
387387 tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
388388
389389 let duration = tokio:: time:: Duration :: from_secs ( 60 ) ;
390- let db_path: Arc < Path > = db_path. into ( ) ;
391390 loop {
392- let db_path = db_path. clone ( ) ;
393391 let Some ( stats) = stats. upgrade ( ) else {
394392 return Ok ( ( ) ) ;
395393 } ;
396394
397- let encryption_config = encryption_config. clone ( ) ;
398- let _ = tokio:: task:: spawn_blocking ( move || {
399- // because closing the last connection interferes with opening a new one, we lazily
400- // initialize a connection here, and keep it alive for the entirety of the program. If we
401- // fail to open it, we wait for `duration` and try again later.
402- match open_conn ( & db_path, Sqlite3WalManager :: new ( ) , Some ( rusqlite:: OpenFlags :: SQLITE_OPEN_READ_ONLY ) , encryption_config) {
403- Ok ( mut conn) => {
404- if let Ok ( tx) = conn. transaction ( ) {
405- let page_count = tx. query_row ( "pragma page_count;" , [ ] , |row| { row. get :: < usize , u64 > ( 0 ) } ) ;
406- let freelist_count = tx. query_row ( "pragma freelist_count;" , [ ] , |row| { row. get :: < usize , u64 > ( 0 ) } ) ;
407- if let ( Ok ( page_count) , Ok ( freelist_count) ) = ( page_count, freelist_count) {
408- let storage_bytes_used = ( page_count - freelist_count) * 4096 ;
409- stats. set_storage_bytes_used ( storage_bytes_used) ;
410- }
411- }
412- } ,
413- Err ( e) => {
414- tracing:: warn!( "failed to open connection for storager monitor: {e}, trying again in {duration:?}" ) ;
415- } ,
395+ match connection_maker. create ( ) . await {
396+ Ok ( conn) => {
397+ let _ = BLOCKING_RT
398+ . spawn_blocking ( move || {
399+ conn. with_raw ( |conn| {
400+ if let Ok ( tx) = conn. transaction ( ) {
401+ let page_count = tx. query_row ( "pragma page_count;" , [ ] , |row| {
402+ row. get :: < usize , u64 > ( 0 )
403+ } ) ;
404+ let freelist_count =
405+ tx. query_row ( "pragma freelist_count;" , [ ] , |row| {
406+ row. get :: < usize , u64 > ( 0 )
407+ } ) ;
408+ if let ( Ok ( page_count) , Ok ( freelist_count) ) =
409+ ( page_count, freelist_count)
410+ {
411+ let storage_bytes_used = ( page_count - freelist_count) * 4096 ;
412+ stats. set_storage_bytes_used ( storage_bytes_used) ;
413+ }
414+ }
415+ } )
416+ } )
417+ . await ;
418+ }
419+ Err ( e) => {
420+ tracing:: warn!( "failed to open connection for storager monitor: {e}, trying again in {duration:?}" ) ;
416421 }
417- } ) . await ;
422+ }
418423
419424 tokio:: time:: sleep ( duration) . await ;
420425 }
0 commit comments