@@ -2,6 +2,7 @@ use std::collections::BTreeMap;
22use std:: fmt;
33use std:: future:: Future ;
44use std:: path:: PathBuf ;
5+ use std:: pin:: Pin ;
56use std:: str:: FromStr ;
67use std:: sync:: Arc ;
78
@@ -121,6 +122,14 @@ impl fmt::Display for SegmentKey {
121122 }
122123}
123124
125+ /// takes the new durable frame_no and returns a future
126+ pub type OnStoreCallback = Box <
127+ dyn FnOnce ( u64 ) -> Pin < Box < dyn Future < Output = ( ) > + Send + Sync + ' static > >
128+ + Send
129+ + Sync
130+ + ' static ,
131+ > ;
132+
124133pub trait Storage : Send + Sync + ' static {
125134 type Segment : Segment ;
126135 type Config ;
@@ -133,7 +142,8 @@ pub trait Storage: Send + Sync + 'static {
133142 namespace : & NamespaceName ,
134143 seg : Self :: Segment ,
135144 config_override : Option < Arc < Self :: Config > > ,
136- ) -> impl Future < Output = u64 > + Send + Sync + ' static ;
145+ on_store : OnStoreCallback ,
146+ ) ;
137147
138148 fn durable_frame_no_sync (
139149 & self ,
@@ -190,8 +200,8 @@ impl Storage for NoStorage {
190200 _namespace : & NamespaceName ,
191201 _seg : Self :: Segment ,
192202 _config : Option < Arc < Self :: Config > > ,
193- ) -> impl Future < Output = u64 > + Send + Sync + ' static {
194- std :: future :: ready ( u64 :: MAX )
203+ _on_store : OnStoreCallback ,
204+ ) {
195205 }
196206
197207 async fn durable_frame_no (
@@ -286,12 +296,15 @@ impl<IO: Io> TestStorage<IO> {
286296 pub fn new_io ( store : bool , io : IO ) -> Self {
287297 let dir = tempdir ( ) . unwrap ( ) ;
288298 Self {
289- inner : Arc :: new ( TestStorageInner {
290- dir,
291- stored : Default :: default ( ) ,
292- io,
293- store,
294- } . into ( ) ) ,
299+ inner : Arc :: new (
300+ TestStorageInner {
301+ dir,
302+ stored : Default :: default ( ) ,
303+ io,
304+ store,
305+ }
306+ . into ( ) ,
307+ ) ,
295308 }
296309 }
297310}
@@ -305,14 +318,17 @@ impl<IO: Io> Storage for TestStorage<IO> {
305318 namespace : & NamespaceName ,
306319 seg : Self :: Segment ,
307320 _config : Option < Arc < Self :: Config > > ,
308- ) -> impl Future < Output = u64 > + Send + Sync + ' static {
321+ on_store : OnStoreCallback ,
322+ ) {
309323 let mut inner = self . inner . lock ( ) ;
310324 if inner. store {
311325 let id = uuid:: Uuid :: new_v4 ( ) ;
312326 let out_path = inner. dir . path ( ) . join ( id. to_string ( ) ) ;
313327 let out_file = inner. io . open ( true , true , true , & out_path) . unwrap ( ) ;
314- let index = tokio:: runtime:: Handle :: current ( ) . block_on ( seg. compact ( & out_file, id) ) . unwrap ( ) ;
315- let end_frame_no = seg. header ( ) . last_committed ( ) ;
328+ let index = tokio:: runtime:: Handle :: current ( )
329+ . block_on ( seg. compact ( & out_file, id) )
330+ . unwrap ( ) ;
331+ let end_frame_no = seg. header ( ) . last_committed ( ) ;
316332 let key = SegmentKey {
317333 start_frame_no : seg. header ( ) . start_frame_no . get ( ) ,
318334 end_frame_no,
@@ -323,9 +339,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
323339 . entry ( namespace. clone ( ) )
324340 . or_default ( )
325341 . insert ( key, ( out_path, index) ) ;
326- std:: future:: ready ( end_frame_no)
327- } else {
328- std:: future:: ready ( u64:: MAX )
342+ tokio:: runtime:: Handle :: current ( ) . block_on ( on_store ( end_frame_no) )
329343 }
330344 }
331345
@@ -354,12 +368,10 @@ impl<IO: Io> Storage for TestStorage<IO> {
354368 ) -> u64 {
355369 let inner = self . inner . lock ( ) ;
356370 if inner. store {
357- let Some ( segs) = inner. stored . get ( namespace) else { return 0 } ;
358- segs
359- . keys ( )
360- . map ( |k| k. end_frame_no )
361- . max ( )
362- . unwrap_or ( 0 )
371+ let Some ( segs) = inner. stored . get ( namespace) else {
372+ return 0 ;
373+ } ;
374+ segs. keys ( ) . map ( |k| k. end_frame_no ) . max ( ) . unwrap_or ( 0 )
363375 } else {
364376 u64:: MAX
365377 }
@@ -374,9 +386,10 @@ impl<IO: Io> Storage for TestStorage<IO> {
374386 let inner = self . inner . lock ( ) ;
375387 if inner. store {
376388 if let Some ( segs) = inner. stored . get ( namespace) {
377- let Some ( ( key, _path) ) = segs. iter ( ) . find ( |( k, _) | k. includes ( frame_no) )
378- else { return Err ( Error :: FrameNotFound ( frame_no) ) } ;
379- return Ok ( * key)
389+ let Some ( ( key, _path) ) = segs. iter ( ) . find ( |( k, _) | k. includes ( frame_no) ) else {
390+ return Err ( Error :: FrameNotFound ( frame_no) ) ;
391+ } ;
392+ return Ok ( * key) ;
380393 } else {
381394 panic ! ( "namespace not found" ) ;
382395 }
@@ -394,12 +407,9 @@ impl<IO: Io> Storage for TestStorage<IO> {
394407 let inner = self . inner . lock ( ) ;
395408 if inner. store {
396409 match inner. stored . get ( namespace) {
397- Some ( segs) => {
398- Ok ( segs. get ( & key) . unwrap ( ) . 1 . clone ( ) )
399- }
410+ Some ( segs) => Ok ( segs. get ( & key) . unwrap ( ) . 1 . clone ( ) ) ,
400411 None => panic ! ( "unknown namespace" ) ,
401412 }
402-
403413 } else {
404414 panic ! ( "not storing" )
405415 }
@@ -421,14 +431,12 @@ impl<IO: Io> Storage for TestStorage<IO> {
421431 }
422432 None => panic ! ( "unknown namespace" ) ,
423433 }
424-
425434 } else {
426435 panic ! ( "not storing" )
427436 }
428437 }
429438}
430439
431- #[ derive( Debug ) ]
432440pub struct StoreSegmentRequest < C , S > {
433441 namespace : NamespaceName ,
434442 /// Path to the segment. Read-only for bottomless
@@ -439,4 +447,21 @@ pub struct StoreSegmentRequest<C, S> {
439447 /// alternative configuration to use with the storage layer.
440448 /// e.g: S3 overrides
441449 storage_config_override : Option < Arc < C > > ,
450+ /// Called after the segment was stored, with the new durable index
451+ on_store_callback : OnStoreCallback ,
452+ }
453+
454+ impl < C , S > fmt:: Debug for StoreSegmentRequest < C , S >
455+ where
456+ C : fmt:: Debug ,
457+ S : fmt:: Debug ,
458+ {
459+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
460+ f. debug_struct ( "StoreSegmentRequest" )
461+ . field ( "namespace" , & self . namespace )
462+ . field ( "segment" , & self . segment )
463+ . field ( "created_at" , & self . created_at )
464+ . field ( "storage_config_override" , & self . storage_config_override )
465+ . finish ( )
466+ }
442467}
0 commit comments