@@ -120,7 +120,7 @@ impl<IO: Io> S3Backend<IO> {
120120 Ok ( stream. into_async_read ( ) )
121121 }
122122
123- async fn fetch_segment_data (
123+ async fn fetch_segment_data_inner (
124124 & self ,
125125 config : & S3Config ,
126126 folder_key : & FolderKey < ' _ > ,
@@ -158,12 +158,12 @@ impl<IO: Io> S3Backend<IO> {
158158 Ok ( ( ) )
159159 }
160160
161- async fn fetch_segment_index (
161+ async fn fetch_segment_index_inner (
162162 & self ,
163163 config : & S3Config ,
164164 folder_key : & FolderKey < ' _ > ,
165165 segment_key : & SegmentKey ,
166- ) -> Result < fst:: Map < Vec < u8 > > > {
166+ ) -> Result < fst:: Map < Arc < [ u8 ] > > > {
167167 let s3_index_key = s3_segment_index_key ( folder_key, segment_key) ;
168168 let mut stream = self . s3_get ( config, s3_index_key) . await ?. into_async_read ( ) ;
169169 let mut header: SegmentIndexHeader = SegmentIndexHeader :: new_zeroed ( ) ;
@@ -177,12 +177,13 @@ impl<IO: Io> S3Backend<IO> {
177177 if checksum != header. checksum . get ( ) {
178178 return Err ( Error :: InvalidIndex ( "invalid index data checksum" ) ) ;
179179 }
180- let index = fst:: Map :: new ( data) . map_err ( |_| Error :: InvalidIndex ( "invalid index bytes" ) ) ?;
180+ let index =
181+ fst:: Map :: new ( data. into ( ) ) . map_err ( |_| Error :: InvalidIndex ( "invalid index bytes" ) ) ?;
181182 Ok ( index)
182183 }
183184
184185 /// Find the most recent, and biggest segment that may contain `frame_no`
185- async fn find_segment (
186+ async fn find_segment_inner (
186187 & self ,
187188 config : & S3Config ,
188189 folder_key : & FolderKey < ' _ > ,
@@ -227,7 +228,10 @@ impl<IO: Io> S3Backend<IO> {
227228 cluster_id : & config. cluster_id ,
228229 namespace,
229230 } ;
230- let Some ( latest_key) = self . find_segment ( config, & folder_key, u64:: MAX ) . await ? else {
231+ let Some ( latest_key) = self
232+ . find_segment_inner ( config, & folder_key, u64:: MAX )
233+ . await ?
234+ else {
231235 tracing:: info!( "nothing to restore for {namespace}" ) ;
232236 return Ok ( ( ) ) ;
233237 } ;
@@ -262,7 +266,7 @@ impl<IO: Io> S3Backend<IO> {
262266
263267 let next_frame_no = header. start_frame_no . get ( ) - 1 ;
264268 let Some ( key) = self
265- . find_segment ( config, & folder_key, next_frame_no)
269+ . find_segment_inner ( config, & folder_key, next_frame_no)
266270 . await ?
267271 else {
268272 todo ! ( "there should be a segment!" ) ;
@@ -283,10 +287,10 @@ impl<IO: Io> S3Backend<IO> {
283287 folder_key : & FolderKey < ' _ > ,
284288 segment_key : & SegmentKey ,
285289 dest_file : & impl FileExt ,
286- ) -> Result < fst:: Map < Vec < u8 > > > {
290+ ) -> Result < fst:: Map < Arc < [ u8 ] > > > {
287291 let ( _, index) = tokio:: try_join!(
288- self . fetch_segment_data ( config, & folder_key, & segment_key, dest_file) ,
289- self . fetch_segment_index ( config, & folder_key, & segment_key) ,
292+ self . fetch_segment_data_inner ( config, & folder_key, & segment_key, dest_file) ,
293+ self . fetch_segment_index_inner ( config, & folder_key, & segment_key) ,
290294 ) ?;
291295
292296 Ok ( index)
@@ -378,16 +382,19 @@ where
378382 namespace : & NamespaceName ,
379383 frame_no : u64 ,
380384 dest_path : & Path ,
381- ) -> Result < fst:: Map < Vec < u8 > > > {
385+ ) -> Result < fst:: Map < Arc < [ u8 ] > > > {
382386 let folder_key = FolderKey {
383387 cluster_id : & config. cluster_id ,
384388 namespace : & namespace,
385389 } ;
386-
387- let Some ( segment_key) = self . find_segment ( config, & folder_key, frame_no) . await ? else {
390+
391+ let Some ( segment_key) = self
392+ . find_segment_inner ( config, & folder_key, frame_no)
393+ . await ?
394+ else {
388395 return Err ( Error :: FrameNotFound ( frame_no) ) ;
389396 } ;
390-
397+
391398 if segment_key. includes ( frame_no) {
392399 // TODO: make open async
393400 let file = self . io . open ( false , false , true , dest_path) ?;
@@ -403,13 +410,15 @@ where
403410 config : & Self :: Config ,
404411 namespace : & NamespaceName ,
405412 ) -> Result < super :: DbMeta > {
406- // request a key bigger than any other to get the last segment
407413 let folder_key = FolderKey {
408414 cluster_id : & config. cluster_id ,
409415 namespace : & namespace,
410416 } ;
411417
412- let max_segment_key = self . find_segment ( config, & folder_key, u64:: MAX ) . await ?;
418+ // request a key bigger than any other to get the last segment
419+ let max_segment_key = self
420+ . find_segment_inner ( config, & folder_key, u64:: MAX )
421+ . await ?;
413422
414423 Ok ( super :: DbMeta {
415424 max_frame_no : max_segment_key. map ( |s| s. end_frame_no ) . unwrap_or ( 0 ) ,
@@ -432,6 +441,60 @@ where
432441 RestoreOptions :: Timestamp ( _) => todo ! ( ) ,
433442 }
434443 }
444+
445+ async fn find_segment (
446+ & self ,
447+ config : & Self :: Config ,
448+ namespace : & NamespaceName ,
449+ frame_no : u64 ,
450+ ) -> Result < SegmentKey > {
451+ let folder_key = FolderKey {
452+ cluster_id : & config. cluster_id ,
453+ namespace : & namespace,
454+ } ;
455+ self . find_segment_inner ( config, & folder_key, frame_no)
456+ . await ?
457+ . ok_or_else ( || Error :: FrameNotFound ( frame_no) )
458+ }
459+
460+ async fn fetch_segment_index (
461+ & self ,
462+ config : & Self :: Config ,
463+ namespace : & NamespaceName ,
464+ key : & SegmentKey ,
465+ ) -> Result < fst:: Map < Arc < [ u8 ] > > > {
466+ let folder_key = FolderKey {
467+ cluster_id : & config. cluster_id ,
468+ namespace : & namespace,
469+ } ;
470+ self . fetch_segment_index_inner ( config, & folder_key, key) . await
471+ }
472+
473+ async fn fetch_segment_data_to_file (
474+ & self ,
475+ config : & Self :: Config ,
476+ namespace : & NamespaceName ,
477+ key : & SegmentKey ,
478+ file : & impl FileExt ,
479+ ) -> Result < ( ) > {
480+ let folder_key = FolderKey {
481+ cluster_id : & config. cluster_id ,
482+ namespace : & namespace,
483+ } ;
484+ self . fetch_segment_data_inner ( config, & folder_key, key, file) . await ?;
485+ Ok ( ( ) )
486+ }
487+
488+ async fn fetch_segment_data (
489+ self : Arc < Self > ,
490+ config : Arc < Self :: Config > ,
491+ namespace : NamespaceName ,
492+ key : SegmentKey ,
493+ ) -> Result < impl FileExt > {
494+ let file = self . io . tempfile ( ) ?;
495+ self . fetch_segment_data_to_file ( & config, & namespace, & key, & file) . await ?;
496+ Ok ( file)
497+ }
435498}
436499
437500#[ derive( Clone , Copy ) ]
0 commit comments