@@ -55,6 +55,7 @@ use url::Url;
5555use utils:: services:: idle_shutdown:: IdleShutdownKicker ;
5656
5757use self :: config:: MetaStoreConfig ;
58+ use self :: connection:: connection_manager:: InnerWalManager ;
5859use self :: namespace:: configurator:: {
5960 BaseNamespaceConfig , NamespaceConfigurators , PrimaryConfigurator , PrimaryExtraConfig ,
6061 ReplicaConfigurator , SchemaConfigurator ,
@@ -336,7 +337,8 @@ where
336337 config. heartbeat_url. as_deref( ) . unwrap_or( "<not supplied>" ) ,
337338 config. heartbeat_period,
338339 ) ;
339- join_set. spawn ( {
340+
341+ self . spawn_until_shutdown_on ( join_set, {
340342 let heartbeat_auth = config. heartbeat_auth . clone ( ) ;
341343 let heartbeat_period = config. heartbeat_period ;
342344 let heartbeat_url = if let Some ( url) = & config. heartbeat_url {
@@ -428,7 +430,7 @@ where
428430 let ( scripted_backup, script_backup_task) =
429431 ScriptBackupManager :: new ( & self . path , CommandHandler :: new ( command. to_string ( ) ) )
430432 . await ?;
431- self . spawn_until_shutdown ( & mut join_set, script_backup_task. run ( ) ) ;
433+ self . spawn_until_shutdown_on ( & mut join_set, script_backup_task. run ( ) ) ;
432434 Some ( scripted_backup)
433435 }
434436 None => None ,
@@ -484,7 +486,6 @@ where
484486 )
485487 . await ?;
486488
487-
488489 self . spawn_monitoring_tasks ( & mut join_set, stats_receiver) ?;
489490
490491 // if namespaces are enabled, then bottomless must have set DB ID
@@ -501,7 +502,7 @@ where
501502 let proxy_service =
502503 ProxyService :: new ( namespace_store. clone ( ) , None , self . disable_namespaces ) ;
503504 // Garbage collect proxy clients every 30 seconds
504- self . spawn_until_shutdown ( & mut join_set, {
505+ self . spawn_until_shutdown_on ( & mut join_set, {
505506 let clients = proxy_service. clients ( ) ;
506507 async move {
507508 loop {
@@ -511,14 +512,17 @@ where
511512 }
512513 } ) ;
513514
514- self . spawn_until_shutdown ( & mut join_set, run_rpc_server (
515- proxy_service,
516- config. acceptor ,
517- config. tls_config ,
518- idle_shutdown_kicker. clone ( ) ,
519- namespace_store. clone ( ) ,
520- self . disable_namespaces ,
521- ) ) ;
515+ self . spawn_until_shutdown_on (
516+ & mut join_set,
517+ run_rpc_server (
518+ proxy_service,
519+ config. acceptor ,
520+ config. tls_config ,
521+ idle_shutdown_kicker. clone ( ) ,
522+ namespace_store. clone ( ) ,
523+ self . disable_namespaces ,
524+ ) ,
525+ ) ;
522526 }
523527
524528 let shutdown_timeout = self . shutdown_timeout . clone ( ) ;
@@ -530,7 +534,7 @@ where
530534 // The migration scheduler is only useful on the primary
531535 let meta_conn = metastore_conn_maker ( ) ?;
532536 let scheduler = Scheduler :: new ( namespace_store. clone ( ) , meta_conn) . await ?;
533- self . spawn_until_shutdown ( & mut join_set, async move {
537+ self . spawn_until_shutdown_on ( & mut join_set, async move {
534538 scheduler. run ( scheduler_receiver) . await ;
535539 Ok ( ( ) )
536540 } ) ;
@@ -560,7 +564,7 @@ where
560564 ) ;
561565
562566 // Garbage collect proxy clients every 30 seconds
563- self . spawn_until_shutdown ( & mut join_set, {
567+ self . spawn_until_shutdown_on ( & mut join_set, {
564568 let clients = proxy_svc. clients ( ) ;
565569 async move {
566570 loop {
@@ -583,8 +587,7 @@ where
583587 DatabaseKind :: Replica => {
584588 dbg ! ( ) ;
585589 let ( channel, uri) = client_config. clone ( ) . unwrap ( ) ;
586- let replication_svc =
587- ReplicationLogProxyService :: new ( channel. clone ( ) , uri. clone ( ) ) ;
590+ let replication_svc = ReplicationLogProxyService :: new ( channel. clone ( ) , uri. clone ( ) ) ;
588591 let proxy_svc = ReplicaProxyService :: new (
589592 channel,
590593 uri,
@@ -651,7 +654,12 @@ where
651654 match self . use_custom_wal {
652655 Some ( CustomWAL :: LibsqlWal ) => self . libsql_wal_configurators ( ) ,
653656 #[ cfg( feature = "durable-wal" ) ]
654- Some ( CustomWAL :: DurableWal ) => self . durable_wal_configurators ( ) ,
657+ Some ( CustomWAL :: DurableWal ) => self . durable_wal_configurators (
658+ base_config,
659+ scripted_backup,
660+ migration_scheduler_handle,
661+ client_config,
662+ ) ,
655663 None => {
656664 self . legacy_configurators (
657665 base_config,
@@ -669,11 +677,44 @@ where
669677 }
670678
671679 #[ cfg( feature = "durable-wal" ) ]
672- fn durable_wal_configurators ( & self ) -> anyhow:: Result < NamespaceConfigurators > {
673- todo ! ( ) ;
680+ fn durable_wal_configurators (
681+ & self ,
682+ base_config : BaseNamespaceConfig ,
683+ scripted_backup : Option < ScriptBackupManager > ,
684+ migration_scheduler_handle : SchedulerHandle ,
685+ client_config : Option < ( Channel , Uri ) > ,
686+ ) -> anyhow:: Result < NamespaceConfigurators > {
687+ tracing:: info!( "using durable wal" ) ;
688+ let lock_manager = Arc :: new ( std:: sync:: Mutex :: new ( LockManager :: new ( ) ) ) ;
689+ let namespace_resolver = |path : & Path | {
690+ NamespaceName :: from_string (
691+ path. parent ( )
692+ . unwrap ( )
693+ . file_name ( )
694+ . unwrap ( )
695+ . to_str ( )
696+ . unwrap ( )
697+ . to_string ( ) ,
698+ )
699+ . unwrap ( )
700+ . into ( )
701+ } ;
702+ let wal = DurableWalManager :: new (
703+ lock_manager,
704+ namespace_resolver,
705+ self . storage_server_address . clone ( ) ,
706+ ) ;
707+ let make_wal_manager = Arc :: new ( move || EitherWAL :: C ( wal. clone ( ) ) ) ;
708+ self . configurators_common (
709+ client_config,
710+ base_config,
711+ make_wal_manager,
712+ scripted_backup,
713+ migration_scheduler_handle,
714+ )
674715 }
675716
676- fn spawn_until_shutdown < F > ( & self , join_set : & mut JoinSet < anyhow:: Result < ( ) > > , fut : F )
717+ fn spawn_until_shutdown_on < F > ( & self , join_set : & mut JoinSet < anyhow:: Result < ( ) > > , fut : F )
677718 where
678719 F : Future < Output = anyhow:: Result < ( ) > > + Send + ' static ,
679720 {
@@ -694,6 +735,23 @@ where
694735 client_config : Option < ( Channel , Uri ) > ,
695736 ) -> anyhow:: Result < NamespaceConfigurators > {
696737 let make_wal_manager = Arc :: new ( || EitherWAL :: A ( Sqlite3WalManager :: default ( ) ) ) ;
738+ self . configurators_common (
739+ client_config,
740+ base_config,
741+ make_wal_manager,
742+ scripted_backup,
743+ migration_scheduler_handle,
744+ )
745+ }
746+
747+ fn configurators_common (
748+ & self ,
749+ client_config : Option < ( Channel , Uri ) > ,
750+ base_config : BaseNamespaceConfig ,
751+ make_wal_manager : Arc < dyn Fn ( ) -> InnerWalManager + Sync + Send + ' static > ,
752+ scripted_backup : Option < ScriptBackupManager > ,
753+ migration_scheduler_handle : SchedulerHandle ,
754+ ) -> anyhow:: Result < NamespaceConfigurators > {
697755 let mut configurators = NamespaceConfigurators :: empty ( ) ;
698756
699757 match client_config {
0 commit comments