11use std:: collections:: BTreeMap ;
2- use std:: io:: BufWriter ;
2+ use std:: hash:: Hasher ;
3+ use std:: io:: { BufWriter , ErrorKind , Write } ;
34use std:: mem:: size_of;
45use std:: ops:: Deref ;
56use std:: path:: { Path , PathBuf } ;
@@ -15,6 +16,7 @@ use crate::error::Result;
1516use crate :: io:: buf:: { IoBufMut , ZeroCopyBuf } ;
1617use crate :: io:: file:: { BufCopy , FileExt } ;
1718use crate :: LIBSQL_MAGIC ;
19+ use crate :: io:: Inspect ;
1820
1921use super :: compacted:: { CompactedSegmentDataFooter , CompactedSegmentDataHeader } ;
2022use super :: { frame_offset, page_offset, Frame , FrameHeader , Segment , SegmentHeader , SegmentFlags } ;
@@ -198,7 +200,8 @@ impl<F: FileExt> SealedSegment<F> {
198200
199201 // This happens in case of crash: the segment is not empty, but it wasn't sealed. We need to
200202 // recover the index, and seal the segment.
201- if index_offset == 0 {
203+ if !header. flags ( ) . contains ( SegmentFlags :: SEALED ) {
204+ assert_eq ! ( header. index_offset. get( ) , 0 ) ;
202205 return Self :: recover ( file, path, header) . map ( Some ) ;
203206 }
204207
@@ -218,32 +221,79 @@ impl<F: FileExt> SealedSegment<F> {
218221 }
219222
220223 fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader ) -> Result < Self > {
224+ assert ! ( !header. is_empty( ) ) ;
225+ assert_eq ! ( header. index_size. get( ) , 0 ) ;
226+ assert_eq ! ( header. index_offset. get( ) , 0 ) ;
227+ assert ! ( !header. flags( ) . contains( SegmentFlags :: SEALED ) ) ;
228+ // recovery for replica log should take a different path (i.e: resync with primary)
229+ assert ! ( !header. flags( ) . contains( SegmentFlags :: FRAME_UNORDERED ) ) ;
230+
231+ let mut current_checksum = header. salt . get ( ) ;
221232 tracing:: trace!( "recovering unsealed segment at {path:?}" ) ;
222233 let mut index = BTreeMap :: new ( ) ;
223- assert ! ( !header. is_empty( ) ) ;
224- let mut frame_header = FrameHeader :: new_zeroed ( ) ;
225- for i in 0 ..header. count_committed ( ) {
226- let offset = frame_offset ( i as u32 ) ;
227- file. read_exact_at ( frame_header. as_bytes_mut ( ) , offset) ?;
228- index. insert ( frame_header. page_no . get ( ) , i as u32 ) ;
234+ let mut frame: Box < CheckedFrame > = CheckedFrame :: new_box_zeroed ( ) ;
235+ let mut current_tx = Vec :: new ( ) ;
236+ let mut last_committed = 0 ;
237+ let mut size_after = 0 ;
238+ let mut frame_count = 0 ;
239+ for i in 0 .. {
240+ let offset = checked_frame_offset ( i as u32 ) ;
241+ match file. read_exact_at ( frame. as_bytes_mut ( ) , offset) {
242+ Ok ( _) => {
243+ let new_checksum = frame. frame . checksum ( current_checksum) ;
244+ // this is the first checksum that don't match the checksum chain, drop the
245+ // transaction and any frame after that.
246+ if new_checksum != frame. checksum . get ( ) {
247+ tracing:: warn!(
248+ "found invalid checksum in segment, dropping {} frames" ,
249+ header. last_committed( ) - last_committed
250+ ) ;
251+ break ;
252+ }
253+ current_checksum = new_checksum;
254+ frame_count += 1 ;
255+
256+ current_tx. push ( frame. frame . header ( ) . page_no ( ) ) ;
257+ if frame. frame . header . is_commit ( ) {
258+ last_committed = frame. frame . header ( ) . frame_no ( ) ;
259+ size_after = frame. frame . header ( ) . size_after ( ) ;
260+ let base_offset = ( i + 1 ) - current_tx. len ( ) ;
261+ for ( frame_offset, page_no) in current_tx. drain ( ..) . enumerate ( ) {
262+ index. insert ( page_no, ( base_offset + frame_offset) as u32 ) ;
263+ }
264+ }
265+ }
266+ Err ( e) if e. kind ( ) == ErrorKind :: UnexpectedEof => break ,
267+ Err ( e) => return Err ( e. into ( ) ) ,
268+ }
229269 }
230270
231- let index_offset = header . count_committed ( ) as u32 ;
232- let index_byte_offset = frame_offset ( index_offset) ;
271+ let index_offset = frame_count as u32 ;
272+ let index_byte_offset = checked_frame_offset ( index_offset) ;
233273 let cursor = file. cursor ( index_byte_offset) ;
234274 let writer = BufCopy :: new ( cursor) ;
235- let mut writer = BufWriter :: new ( writer) ;
275+ let writer = BufWriter :: new ( writer) ;
276+ let mut digest = crc32fast:: Hasher :: new_with_initial ( current_checksum) ;
277+ let mut writer = Inspect :: new ( writer, |data : & [ u8 ] | {
278+ digest. write ( data) ;
279+ } ) ;
236280 let mut builder = MapBuilder :: new ( & mut writer) ?;
237281 for ( k, v) in index. into_iter ( ) {
238282 builder. insert ( k. to_be_bytes ( ) , v as u64 ) . unwrap ( ) ;
239283 }
240284 builder. finish ( ) . unwrap ( ) ;
241- let ( cursor, index_bytes) = writer
285+ let mut writer = writer. into_inner ( ) ;
286+ let index_size = writer. get_ref ( ) . get_ref ( ) . count ( ) ;
287+ let index_checksum = digest. finalize ( ) ;
288+ writer. write_all ( & index_checksum. to_le_bytes ( ) ) ?;
289+ let ( _, index_bytes) = writer
242290 . into_inner ( )
243291 . map_err ( |e| e. into_parts ( ) . 0 ) ?
244292 . into_parts ( ) ;
245293 header. index_offset = index_byte_offset. into ( ) ;
246- header. index_size = cursor. count ( ) . into ( ) ;
294+ header. index_size = index_size. into ( ) ;
295+ header. last_commited_frame_no = last_committed. into ( ) ;
296+ header. size_after = size_after. into ( ) ;
247297 header. recompute_checksum ( ) ;
248298 file. write_all_at ( header. as_bytes ( ) , 0 ) ?;
249299 let index = Map :: new ( index_bytes. into ( ) ) . unwrap ( ) ;
0 commit comments