@@ -20,10 +20,11 @@ cfg_replication!(
2020
2121cfg_sync ! {
2222 use crate :: sync:: SyncContext ;
23+ use tokio:: sync:: Mutex ;
24+ use std:: sync:: Arc ;
2325}
2426
25- use crate :: { database:: OpenFlags , local:: connection:: Connection } ;
26- use crate :: { Error :: ConnectionFailed , Result } ;
27+ use crate :: { database:: OpenFlags , local:: connection:: Connection , Error :: ConnectionFailed , Result } ;
2728use libsql_sys:: ffi;
2829
2930// A libSQL database.
@@ -33,7 +34,7 @@ pub struct Database {
3334 #[ cfg( feature = "replication" ) ]
3435 pub replication_ctx : Option < ReplicationContext > ,
3536 #[ cfg( feature = "sync" ) ]
36- pub sync_ctx : Option < tokio :: sync :: Mutex < SyncContext > > ,
37+ pub sync_ctx : Option < Arc < Mutex < SyncContext > > > ,
3738}
3839
3940impl Database {
@@ -222,7 +223,7 @@ impl Database {
222223
223224 let sync_ctx =
224225 SyncContext :: new ( connector, db_path. into ( ) , endpoint, Some ( auth_token) ) . await ?;
225- db. sync_ctx = Some ( tokio :: sync :: Mutex :: new ( sync_ctx) ) ;
226+ db. sync_ctx = Some ( Arc :: new ( Mutex :: new ( sync_ctx) ) ) ;
226227
227228 Ok ( db)
228229 }
@@ -463,137 +464,10 @@ impl Database {
463464 #[ cfg( feature = "sync" ) ]
464465 /// Sync WAL frames to remote.
465466 pub async fn sync_offline ( & self ) -> Result < crate :: database:: Replicated > {
466- use crate :: sync:: SyncError ;
467- use crate :: Error ;
468-
469467 let mut sync_ctx = self . sync_ctx . as_ref ( ) . unwrap ( ) . lock ( ) . await ;
470468 let conn = self . connect ( ) ?;
471469
472- let durable_frame_no = sync_ctx. durable_frame_num ( ) ;
473- let max_frame_no = conn. wal_frame_count ( ) ;
474-
475- if max_frame_no > durable_frame_no {
476- match self . try_push ( & mut sync_ctx, & conn) . await {
477- Ok ( rep) => Ok ( rep) ,
478- Err ( Error :: Sync ( err) ) => {
479- // Retry the sync because we are ahead of the server and we need to push some older
480- // frames.
481- if let Some ( SyncError :: InvalidPushFrameNoLow ( _, _) ) = err. downcast_ref ( ) {
482- tracing:: debug!( "got InvalidPushFrameNo, retrying push" ) ;
483- self . try_push ( & mut sync_ctx, & conn) . await
484- } else {
485- Err ( Error :: Sync ( err) )
486- }
487- }
488- Err ( e) => Err ( e) ,
489- }
490- } else {
491- self . try_pull ( & mut sync_ctx, & conn) . await
492- }
493- . or_else ( |err| {
494- let Error :: Sync ( err) = err else {
495- return Err ( err) ;
496- } ;
497-
498- // TODO(levy): upcasting should be done *only* at the API boundary, doing this in
499- // internal code just sucks.
500- let Some ( SyncError :: HttpDispatch ( _) ) = err. downcast_ref ( ) else {
501- return Err ( Error :: Sync ( err) ) ;
502- } ;
503-
504- Ok ( crate :: database:: Replicated {
505- frame_no : None ,
506- frames_synced : 0 ,
507- } )
508- } )
509- }
510-
511- #[ cfg( feature = "sync" ) ]
512- async fn try_push (
513- & self ,
514- sync_ctx : & mut SyncContext ,
515- conn : & Connection ,
516- ) -> Result < crate :: database:: Replicated > {
517- let page_size = {
518- let rows = conn
519- . query ( "PRAGMA page_size" , crate :: params:: Params :: None ) ?
520- . unwrap ( ) ;
521- let row = rows. next ( ) ?. unwrap ( ) ;
522- let page_size = row. get :: < u32 > ( 0 ) ?;
523- page_size
524- } ;
525-
526- let max_frame_no = conn. wal_frame_count ( ) ;
527- if max_frame_no == 0 {
528- return Ok ( crate :: database:: Replicated {
529- frame_no : None ,
530- frames_synced : 0 ,
531- } ) ;
532- }
533-
534- let generation = sync_ctx. generation ( ) ; // TODO: Probe from WAL.
535- let start_frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
536- let end_frame_no = max_frame_no;
537-
538- let mut frame_no = start_frame_no;
539- while frame_no <= end_frame_no {
540- let frame = conn. wal_get_frame ( frame_no, page_size) ?;
541-
542- // The server returns its maximum frame number. To avoid resending
543- // frames the server already knows about, we need to update the
544- // frame number to the one returned by the server.
545- let max_frame_no = sync_ctx
546- . push_one_frame ( frame. freeze ( ) , generation, frame_no)
547- . await ?;
548-
549- if max_frame_no > frame_no {
550- frame_no = max_frame_no;
551- }
552- frame_no += 1 ;
553- }
554-
555- sync_ctx. write_metadata ( ) . await ?;
556-
557- // TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
558- // than what we have stored here.
559- let frame_count = end_frame_no - start_frame_no + 1 ;
560- Ok ( crate :: database:: Replicated {
561- frame_no : None ,
562- frames_synced : frame_count as usize ,
563- } )
564- }
565-
566- #[ cfg( feature = "sync" ) ]
567- async fn try_pull (
568- & self ,
569- sync_ctx : & mut SyncContext ,
570- conn : & Connection ,
571- ) -> Result < crate :: database:: Replicated > {
572- let generation = sync_ctx. generation ( ) ;
573- let mut frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
574-
575- let insert_handle = conn. wal_insert_handle ( ) ?;
576-
577- loop {
578- match sync_ctx. pull_one_frame ( generation, frame_no) . await {
579- Ok ( Some ( frame) ) => {
580- insert_handle. insert ( & frame) ?;
581- frame_no += 1 ;
582- }
583- Ok ( None ) => {
584- sync_ctx. write_metadata ( ) . await ?;
585- return Ok ( crate :: database:: Replicated {
586- frame_no : None ,
587- frames_synced : 1 ,
588- } ) ;
589- }
590- Err ( err) => {
591- tracing:: debug!( "pull_one_frame error: {:?}" , err) ;
592- sync_ctx. write_metadata ( ) . await ?;
593- return Err ( err) ;
594- }
595- }
596- }
470+ crate :: sync:: sync_offline ( & mut sync_ctx, & conn) . await
597471 }
598472
599473 pub ( crate ) fn path ( & self ) -> & str {
0 commit comments