@@ -23,9 +23,9 @@ use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
2323///
2424/// On shutdown, attempts to empty the queue, and flush the receiver. When the last handle of the
2525/// receiver is dropped, and the queue is empty, exit.
26- pub struct AsyncStorageLoop < B , IO : Io , S > {
27- receiver : mpsc:: UnboundedReceiver < StorageLoopMessage < S > > ,
28- scheduler : Scheduler < S > ,
26+ pub struct AsyncStorageLoop < B : Backend , IO : Io , S > {
27+ receiver : mpsc:: UnboundedReceiver < StorageLoopMessage < S , B :: Config > > ,
28+ scheduler : Scheduler < S , B :: Config > ,
2929 backend : Arc < B > ,
3030 io : Arc < IO > ,
3131 max_in_flight : usize ,
5050 pub async fn run ( mut self ) {
5151 let mut shutting_down = false ;
5252 let mut in_flight_futs = JoinSet :: new ( ) ;
53+ let mut notify_shutdown = None ;
5354 // run the loop until shutdown.
5455 loop {
5556 if shutting_down && self . scheduler . is_empty ( ) {
9293 Some ( StorageLoopMessage :: DurableFrameNoReq { namespace, ret, config_override } ) => {
9394 self . fetch_durable_frame_no_async( namespace, ret, config_override) ;
9495 }
96+ Some ( StorageLoopMessage :: Shutdown ( ret) ) => {
97+ notify_shutdown. replace( ret) ;
98+ shutting_down = true ;
99+ tracing:: info!( "Storage shutting down" ) ;
100+ }
95101 None => {
96102 shutting_down = true ;
97103 }
@@ -108,6 +114,10 @@ where
108114 }
109115 }
110116 }
117+ tracing:: info!( "Storage shutdown" ) ;
118+ if let Some ( notify) = notify_shutdown {
119+ let _ = notify. send ( ( ) ) ;
120+ }
111121 }
112122
113123 fn fetch_durable_frame_no_async (
@@ -147,18 +157,19 @@ pub struct BottomlessConfig<C> {
147157 pub config : C ,
148158}
149159
150- enum StorageLoopMessage < S > {
151- StoreReq ( StoreSegmentRequest < S > ) ,
160+ enum StorageLoopMessage < S , C > {
161+ StoreReq ( StoreSegmentRequest < S , C > ) ,
152162 DurableFrameNoReq {
153163 namespace : NamespaceName ,
154- config_override : Option < Arc < dyn Any + Send + Sync > > ,
164+ config_override : Option < C > ,
155165 ret : oneshot:: Sender < super :: Result < u64 > > ,
156166 } ,
167+ Shutdown ( oneshot:: Sender < ( ) > ) ,
157168}
158169
159170pub struct AsyncStorage < B , S > {
160171 /// send request to the main loop
161- job_sender : mpsc:: UnboundedSender < StorageLoopMessage < S > > ,
172+ job_sender : mpsc:: UnboundedSender < StorageLoopMessage < S , B :: Config > > ,
162173 force_shutdown : oneshot:: Sender < ( ) > ,
163174 backend : Arc < B > ,
164175}
@@ -171,6 +182,12 @@ where
171182 type Segment = S ;
172183 type Config = B :: Config ;
173184
185+ async fn shutdown ( & self ) {
186+ let ( snd, rcv) = oneshot:: channel ( ) ;
187+ let _ = self . job_sender . send ( StorageLoopMessage :: Shutdown ( snd) ) ;
188+ let _ = rcv. await ;
189+ }
190+
174191 fn store (
175192 & self ,
176193 namespace : & NamespaceName ,
0 commit comments