@@ -121,10 +121,13 @@ where
121121 let notifier = self . checkpoint_notifier . clone ( ) ;
122122 let namespace = shared. namespace ( ) . clone ( ) ;
123123 let durable_frame_no = shared. durable_frame_no . clone ( ) ;
124- let cb: OnStoreCallback = Box :: new ( move |fno| Box :: pin ( async move {
125- update_durable ( fno, notifier, durable_frame_no, namespace) . await ;
126- } ) ) ;
127- self . storage . store ( & shared. namespace , sealed. clone ( ) , None , cb) ;
124+ let cb: OnStoreCallback = Box :: new ( move |fno| {
125+ Box :: pin ( async move {
126+ update_durable ( fno, notifier, durable_frame_no, namespace) . await ;
127+ } )
128+ } ) ;
129+ self . storage
130+ . store ( & shared. namespace , sealed. clone ( ) , None , cb) ;
128131 new. tail ( ) . push ( sealed) ;
129132 }
130133
@@ -140,7 +143,7 @@ async fn update_durable(
140143 notifier : mpsc:: Sender < CheckpointMessage > ,
141144 durable_frame_no_slot : Arc < Mutex < u64 > > ,
142145 namespace : NamespaceName ,
143- ) {
146+ ) {
144147 {
145148 let mut g = durable_frame_no_slot. lock ( ) ;
146149 if * g < new_durable {
@@ -180,23 +183,18 @@ where
180183 }
181184
182185 let action = match self . opened . entry ( namespace. clone ( ) ) {
183- dashmap:: Entry :: Occupied ( e) => {
184- match e. get ( ) {
185- Slot :: Wal ( shared) => return Ok ( shared. clone ( ) ) ,
186- Slot :: Building ( wait, _) => {
187- Err ( wait. clone ( ) )
188- } ,
189- }
186+ dashmap:: Entry :: Occupied ( e) => match e. get ( ) {
187+ Slot :: Wal ( shared) => return Ok ( shared. clone ( ) ) ,
188+ Slot :: Building ( wait, _) => Err ( wait. clone ( ) ) ,
190189 } ,
191190 dashmap:: Entry :: Vacant ( e) => {
192191 let notifier = Arc :: new ( ( Condvar :: new ( ) , Mutex :: new ( false ) ) ) ;
193192 let async_notifier = Arc :: new ( Notify :: new ( ) ) ;
194193 e. insert ( Slot :: Building ( notifier. clone ( ) , async_notifier. clone ( ) ) ) ;
195194 Ok ( ( notifier, async_notifier) )
196- } ,
195+ }
197196 } ;
198197
199-
200198 match action {
201199 Ok ( ( notifier, async_notifier) ) => {
202200 // if try_open succedded, then the slot was updated and contains the shared wal, if it
@@ -216,8 +214,8 @@ where
216214 cond. 0
217215 . wait_while ( & mut cond. 1 . lock ( ) , |ready : & mut bool | !* ready) ;
218216 // the slot was updated: try again
219- continue
220- } ,
217+ continue ;
218+ }
221219 }
222220 }
223221 }
@@ -254,11 +252,13 @@ where
254252 SealedSegment :: open ( file. into ( ) , entry. path ( ) . to_path_buf ( ) , Default :: default ( ) ) ?
255253 {
256254 let notifier = self . checkpoint_notifier . clone ( ) ;
257- let ns = namespace. clone ( ) ;
255+ let ns = namespace. clone ( ) ;
258256 let durable_frame_no = durable_frame_no. clone ( ) ;
259- let cb: OnStoreCallback = Box :: new ( move |fno| Box :: pin ( async move {
260- update_durable ( fno, notifier, durable_frame_no, ns) . await ;
261- } ) ) ;
257+ let cb: OnStoreCallback = Box :: new ( move |fno| {
258+ Box :: pin ( async move {
259+ update_durable ( fno, notifier, durable_frame_no, ns) . await ;
260+ } )
261+ } ) ;
262262 // TODO: pass config override here
263263 self . storage . store ( & namespace, sealed. clone ( ) , None , cb) ;
264264 tail. push ( sealed) ;
@@ -311,7 +311,8 @@ where
311311 shutdown : false . into ( ) ,
312312 } ) ;
313313
314- self . opened . insert ( namespace. clone ( ) , Slot :: Wal ( shared. clone ( ) ) ) ;
314+ self . opened
315+ . insert ( namespace. clone ( ) , Slot :: Wal ( shared. clone ( ) ) ) ;
315316
316317 return Ok ( shared) ;
317318 }
@@ -321,7 +322,6 @@ where
321322 pub async fn shutdown ( self : Arc < Self > ) -> Result < ( ) > {
322323 self . shutdown . store ( true , Ordering :: SeqCst ) ;
323324
324-
325325 let mut join_set = JoinSet :: < Result < ( ) > > :: new ( ) ;
326326 let semaphore = Arc :: new ( Semaphore :: new ( 8 ) ) ;
327327 for item in self . opened . iter ( ) {
@@ -347,20 +347,23 @@ where
347347
348348 Ok ( ( ) )
349349 } ) ;
350- break
351- } ,
350+ break ;
351+ }
352352 Slot :: Building ( _, notify) => {
353353 // wait for shared to finish building
354354 notify. notified ( ) . await ;
355- } ,
355+ }
356356 }
357357 }
358358 }
359359
360360 while join_set. join_next ( ) . await . is_some ( ) { }
361361
362362 // wait for checkpointer to exit
363- let _ = self . checkpoint_notifier . send ( CheckpointMessage :: Shutdown ) . await ;
363+ let _ = self
364+ . checkpoint_notifier
365+ . send ( CheckpointMessage :: Shutdown )
366+ . await ;
364367 self . checkpoint_notifier . closed ( ) . await ;
365368
366369 Ok ( ( ) )
0 commit comments