1+ use std:: hash:: Hasher ;
12use std:: io:: { BufWriter , IoSlice , Write } ;
23use std:: num:: NonZeroU64 ;
34use std:: ops:: DerefMut ;
45use std:: path:: PathBuf ;
6+ use std:: sync:: atomic:: AtomicU32 ;
57use std:: sync:: {
68 atomic:: { AtomicBool , AtomicU64 , Ordering } ,
79 Arc ,
@@ -16,9 +18,11 @@ use zerocopy::{AsBytes, FromZeroes};
1618
1719use crate :: io:: buf:: { IoBufMut , ZeroCopyBoxIoBuf , ZeroCopyBuf } ;
1820use crate :: io:: file:: FileExt ;
19- use crate :: segment:: SegmentFlags ;
21+ use crate :: io:: Inspect ;
22+ use crate :: segment:: { checked_frame_offset, SegmentFlags } ;
2023use crate :: segment:: { frame_offset, page_offset, sealed:: SealedSegment } ;
2124use crate :: transaction:: { Transaction , TxGuard } ;
25+ use crate :: LIBSQL_MAGIC ;
2226
2327use super :: list:: SegmentList ;
2428use super :: { Frame , FrameHeader , SegmentHeader } ;
@@ -34,6 +38,8 @@ pub struct CurrentSegment<F> {
3438 /// lock
3539 read_locks : Arc < AtomicU64 > ,
3640 sealed : AtomicBool ,
41+ /// current runnign checksum
42+ current_checksum : AtomicU32 ,
3743 tail : Arc < SegmentList < SealedSegment < F > > > ,
3844}
3945
@@ -46,6 +52,7 @@ impl<F> CurrentSegment<F> {
4652 start_frame_no : NonZeroU64 ,
4753 db_size : u32 ,
4854 tail : Arc < SegmentList < SealedSegment < F > > > ,
55+ salt : u32 ,
4956 ) -> Result < Self >
5057 where
5158 F : FileExt ,
@@ -60,6 +67,7 @@ impl<F> CurrentSegment<F> {
6067 flags : 0 . into ( ) ,
6168 magic : LIBSQL_MAGIC . into ( ) ,
6269 version : 1 . into ( ) ,
70+ salt : salt. into ( ) ,
6371 } ;
6472
6573 header. recompute_checksum ( ) ;
@@ -74,6 +82,7 @@ impl<F> CurrentSegment<F> {
7482 read_locks : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
7583 sealed : AtomicBool :: default ( ) ,
7684 tail,
85+ current_checksum : salt. into ( ) ,
7786 } )
7887 }
7988
@@ -102,10 +111,14 @@ impl<F> CurrentSegment<F> {
102111 self . header . lock ( ) . size_after . get ( )
103112 }
104113
114+ pub fn current_checksum ( & self ) -> u32 {
115+ self . current_checksum . load ( Ordering :: Relaxed )
116+ }
117+
105118 /// insert a bunch of frames in the Wal. The frames needn't be ordered, therefore, on commit
106119 /// the last frame no needs to be passed alongside the new size_after.
107120 #[ tracing:: instrument( skip_all) ]
108- pub async fn insert_frames (
121+ pub async fn insert_frames_inject (
109122 & self ,
110123 frames : Vec < Box < Frame > > ,
111124 // (size_after, last_frame_no)
@@ -116,26 +129,48 @@ impl<F> CurrentSegment<F> {
116129 F : FileExt ,
117130 {
118131 assert ! ( !self . sealed. load( Ordering :: SeqCst ) ) ;
132+ assert_eq ! (
133+ tx. savepoints. len( ) ,
134+ 1 ,
135+ "injecting wal should not use savepoints"
136+ ) ;
119137 {
120138 let tx = tx. deref_mut ( ) ;
121139 // let mut commit_frame_written = false;
122140 let current_savepoint = tx. savepoints . last_mut ( ) . expect ( "no savepoints initialized" ) ;
123141 let mut frames = frame_list_to_option ( frames) ;
142+ // For each frame, we compute and write the frame checksum, followed by the frame
143+ // itself as an array of CheckedFrame
124144 for i in 0 ..frames. len ( ) {
125145 let offset = tx. next_offset ;
146+ let current_checksum = current_savepoint. current_checksum ;
147+ let mut digest = crc32fast:: Hasher :: new_with_initial ( current_checksum) ;
148+ digest. write ( frames[ i] . as_ref ( ) . unwrap ( ) . as_bytes ( ) ) ;
149+ let new_checksum = digest. finalize ( ) ;
150+ let ( _buf, ret) = self
151+ . file
152+ . write_all_at_async (
153+ ZeroCopyBuf :: new_init ( zerocopy:: byteorder:: little_endian:: U32 :: new (
154+ new_checksum,
155+ ) ) ,
156+ checked_frame_offset ( offset) ,
157+ )
158+ . await ;
159+ ret?;
160+
126161 let buf = ZeroCopyBoxIoBuf :: new ( frames[ i] . take ( ) . unwrap ( ) ) ;
127162 let ( buf, ret) = self
128163 . file
129164 . write_all_at_async ( buf, frame_offset ( offset) )
130165 . await ;
131-
132166 ret?;
133167
134168 let frame = buf. into_inner ( ) ;
135169
136170 current_savepoint
137171 . index
138172 . insert ( frame. header ( ) . page_no ( ) , offset) ;
173+ current_savepoint. current_checksum = new_checksum;
139174 tx. next_offset += 1 ;
140175 frames[ i] = Some ( frame) ;
141176 }
@@ -161,6 +196,8 @@ impl<F> CurrentSegment<F> {
161196 tx. merge_savepoints ( & self . index ) ;
162197 // set the header last, so that a transaction does not witness a write before
163198 // it's actually committed.
199+ self . current_checksum
200+ . store ( tx. current_checksum ( ) , Ordering :: Relaxed ) ;
164201 * self . header . lock ( ) = header. into_inner ( ) ;
165202
166203 tx. is_commited = true ;
@@ -194,6 +231,11 @@ impl<F> CurrentSegment<F> {
194231 if let Some ( offset) = current_savepoint. index . get ( & page_no) {
195232 tracing:: trace!( page_no, "recycling frame" ) ;
196233 self . file . write_all_at ( page, page_offset ( * offset) ) ?;
234+ // we overwrote a frame, record that for later rewrite
235+ tx. recompute_checksum = Some ( tx
236+ . recompute_checksum
237+ . map ( |old| old. min ( * offset) )
238+ . unwrap_or ( * offset) ) ;
197239 continue ;
198240 }
199241
@@ -210,24 +252,41 @@ impl<F> CurrentSegment<F> {
210252 size_after : size_after. into ( ) ,
211253 frame_no : frame_no. into ( ) ,
212254 } ;
213- let slices = & [ IoSlice :: new ( header. as_bytes ( ) ) , IoSlice :: new ( & page) ] ;
255+
256+ let mut digest =
257+ crc32fast:: Hasher :: new_with_initial ( current_savepoint. current_checksum ) ;
258+ digest. write ( header. as_bytes ( ) ) ;
259+ digest. write ( page) ;
260+
261+ let checksum = digest. finalize ( ) ;
262+ let checksum_bytes = checksum. to_le_bytes ( ) ;
263+ // We write a instance of a ChecksummedFrame
264+ let slices = & [
265+ IoSlice :: new ( & checksum_bytes) ,
266+ IoSlice :: new ( header. as_bytes ( ) ) ,
267+ IoSlice :: new ( & page) ,
268+ ] ;
214269 let offset = tx. next_offset ;
215270 debug_assert_eq ! (
216271 self . header. lock( ) . start_frame_no. get( ) + offset as u64 ,
217272 frame_no
218273 ) ;
219- self . file . write_at_vectored ( slices, frame_offset ( offset) ) ?;
274+ self . file . write_at_vectored ( slices, checked_frame_offset ( offset) ) ?;
220275 assert ! (
221276 current_savepoint. index. insert( page_no, offset) . is_none( ) ,
222277 "existing frames should be recycled"
223278 ) ;
279+ current_savepoint. current_checksum = checksum;
224280 tx. next_frame_no += 1 ;
225281 tx. next_offset += 1 ;
226282 }
227283 }
228284
229285 if let Some ( size_after) = size_after {
230286 if tx. not_empty ( ) {
287+ if let Some ( _offset) = tx. recompute_checksum {
288+ todo ! ( "recompute checksum" ) ;
289+ }
231290 let last_frame_no = tx. next_frame_no - 1 ;
232291 let mut header = { * self . header . lock ( ) } ;
233292 header. last_commited_frame_no = last_frame_no. into ( ) ;
@@ -240,6 +299,8 @@ impl<F> CurrentSegment<F> {
240299 // set the header last, so that a transaction does not witness a write before
241300 // it's actually committed.
242301 * self . header . lock ( ) = header;
302+ self . current_checksum
303+ . store ( tx. current_checksum ( ) , Ordering :: Relaxed ) ;
243304
244305 tx. is_commited = true ;
245306
@@ -281,7 +342,7 @@ impl<F> CurrentSegment<F> {
281342 F : FileExt ,
282343 B : IoBufMut + Send + ' static ,
283344 {
284- let byte_offset = frame_offset ( offset) ;
345+ let byte_offset = dbg ! ( frame_offset( dbg! ( offset) ) ) ;
285346 self . file . read_exact_at_async ( buf, byte_offset) . await
286347 }
287348
@@ -304,10 +365,21 @@ impl<F> CurrentSegment<F> {
304365 {
305366 let mut header = self . header . lock ( ) ;
306367 let index_offset = header. count_committed ( ) as u32 ;
307- let index_byte_offset = frame_offset ( index_offset) ;
368+ let index_byte_offset = checked_frame_offset ( index_offset) ;
308369 let mut cursor = self . file . cursor ( index_byte_offset) ;
309- let mut writer = BufWriter :: new ( & mut cursor) ;
370+ let writer = BufWriter :: new ( & mut cursor) ;
371+
372+ let current = self . current_checksum ( ) ;
373+ let mut digest = crc32fast:: Hasher :: new_with_initial ( current) ;
374+ let mut writer = Inspect :: new ( writer, |data : & [ u8 ] | {
375+ digest. write ( data) ;
376+ } ) ;
310377 self . index . merge_all ( & mut writer) ?;
378+ let mut writer = writer. into_inner ( ) ;
379+ let index_checksum = digest. finalize ( ) ;
380+ let index_size = writer. get_ref ( ) . count ( ) ;
381+ writer. write_all ( & index_checksum. to_le_bytes ( ) ) ?;
382+
311383 writer. into_inner ( ) . map_err ( |e| e. into_parts ( ) . 0 ) ?;
312384 // we perform a first sync to ensure that all the segment has been flushed to disk. We then
313385 // write the header and flush again. We want to guarantee that if we find a segement marked
@@ -318,10 +390,10 @@ impl<F> CurrentSegment<F> {
318390 self . file . sync_all ( ) ?;
319391
320392 header. index_offset = index_byte_offset. into ( ) ;
321- header. index_size = cursor. count ( ) . into ( ) ;
322- header. recompute_checksum ( ) ;
393+ header. index_size = index_size. into ( ) ;
323394 let flags = header. flags ( ) ;
324395 header. set_flags ( flags | SegmentFlags :: SEALED ) ;
396+ header. recompute_checksum ( ) ;
325397 self . file . write_all_at ( header. as_bytes ( ) , 0 ) ?;
326398
327399 // flush the header.
0 commit comments