11//! S3 implementation of storage backend
22
33use std:: fmt;
4+ use std:: mem:: size_of;
45use std:: path:: Path ;
56use std:: pin:: Pin ;
67use std:: str:: FromStr ;
@@ -18,7 +19,8 @@ use libsql_sys::name::NamespaceName;
1819use roaring:: RoaringBitmap ;
1920use tokio:: io:: { AsyncRead , AsyncReadExt , BufReader } ;
2021use tokio_util:: sync:: ReusableBoxFuture ;
21- use zerocopy:: { AsBytes , FromZeroes } ;
22+ use zerocopy:: byteorder:: little_endian:: { U16 as lu16, U32 as lu32, U64 as lu64} ;
23+ use zerocopy:: { AsBytes , FromBytes , FromZeroes } ;
2224
2325use super :: { Backend , SegmentMeta } ;
2426use crate :: io:: buf:: ZeroCopyBuf ;
@@ -27,6 +29,7 @@ use crate::io::{FileExt, Io, StdIO};
2729use crate :: segment:: compacted:: CompactedSegmentDataHeader ;
2830use crate :: segment:: Frame ;
2931use crate :: storage:: { Error , RestoreOptions , Result } ;
32+ use crate :: LIBSQL_MAGIC ;
3033
3134pub struct S3Backend < IO > {
3235 client : Client ,
@@ -44,6 +47,16 @@ impl S3Backend<StdIO> {
4447 }
4548}
4649
50+ /// Header for segment index stored into s3
51+ #[ repr( C ) ]
52+ #[ derive( Copy , Clone , Debug , AsBytes , FromZeroes , FromBytes ) ]
53+ struct SegmentIndexHeader {
54+ magic : lu64 ,
55+ version : lu16 ,
56+ len : lu64 ,
57+ checksum : lu32 ,
58+ }
59+
4760impl < IO : Io > S3Backend < IO > {
4861 #[ doc( hidden) ]
4962 pub async fn from_sdk_config_with_io (
@@ -153,14 +166,19 @@ impl<IO: Io> S3Backend<IO> {
153166 segment_key : & SegmentKey ,
154167 ) -> Result < fst:: Map < Vec < u8 > > > {
155168 let s3_index_key = s3_segment_index_key ( folder_key, segment_key) ;
156- let stream = self . s3_get ( config, s3_index_key) . await ?;
157- // TODO: parse header, check if too large to fit memory
158- let bytes = stream
159- . collect ( )
160- . await
161- . map_err ( |e| Error :: unhandled ( e, "" ) ) ?
162- . to_vec ( ) ;
163- let index = fst:: Map :: new ( bytes) . map_err ( |_| Error :: InvalidIndex ) ?;
169+ let mut stream = self . s3_get ( config, s3_index_key) . await ?. into_async_read ( ) ;
170+ let mut header: SegmentIndexHeader = SegmentIndexHeader :: new_zeroed ( ) ;
171+ stream. read_exact ( header. as_bytes_mut ( ) ) . await ?;
172+ if header. magic . get ( ) != LIBSQL_MAGIC && header. version . get ( ) != 1 {
173+ return Err ( Error :: InvalidIndex ( "index header magic or version invalid" ) ) ;
174+ }
175+ let mut data = Vec :: with_capacity ( header. len . get ( ) as _ ) ;
176+ while stream. read_buf ( & mut data) . await ? != 0 { }
177+ let checksum = crc32fast:: hash ( & data) ;
178+ if checksum != header. checksum . get ( ) {
179+ return Err ( Error :: InvalidIndex ( "invalid index data checksum" ) ) ;
180+ }
181+ let index = fst:: Map :: new ( data) . map_err ( |_| Error :: InvalidIndex ( "invalid index bytes" ) ) ?;
164182 Ok ( index)
165183 }
166184
@@ -408,8 +426,20 @@ where
408426
409427 let s3_index_key = s3_segment_index_key ( & folder_key, & segment_key) ;
410428
411- // TODO: store meta about the index?
412- let body = ByteStream :: from ( segment_index) ;
429+ let checksum = crc32fast:: hash ( & segment_index) ;
430+ let header = SegmentIndexHeader {
431+ version : 1 . into ( ) ,
432+ len : ( segment_index. len ( ) as u64 ) . into ( ) ,
433+ checksum : checksum. into ( ) ,
434+ magic : LIBSQL_MAGIC . into ( ) ,
435+ } ;
436+
437+ let mut bytes =
438+ BytesMut :: with_capacity ( size_of :: < SegmentIndexHeader > ( ) + segment_index. len ( ) ) ;
439+ bytes. extend_from_slice ( header. as_bytes ( ) ) ;
440+ bytes. extend_from_slice ( & segment_index) ;
441+
442+ let body = ByteStream :: from ( bytes. freeze ( ) ) ;
413443
414444 self . s3_put ( config, s3_index_key, body) . await ?;
415445
0 commit comments