@@ -539,13 +539,17 @@ impl SegmentIndex {
539539
540540#[ cfg( test) ]
541541mod test {
542- use std:: io:: Read ;
542+ use std:: env:: temp_dir;
543+ use std:: io:: { self , Read } ;
543544
545+ use chrono:: { DateTime , Utc } ;
546+ use hashbrown:: HashMap ;
544547 use insta:: assert_debug_snapshot;
545- use tempfile:: tempfile;
548+ use tempfile:: { tempdir , tempfile} ;
546549 use tokio_stream:: StreamExt ;
550+ use uuid:: Uuid ;
547551
548- use crate :: io:: FileExt ;
552+ use crate :: io:: { FileExt , Io } ;
549553 use crate :: test:: { seal_current_segment, TestEnv } ;
550554
551555 use super :: * ;
@@ -694,4 +698,189 @@ mod test {
694698 assert_eq ! ( size_after, 2 ) ;
695699 assert_eq ! ( stream. fold( 0 , |count, _| count + 1 ) . await , 0 ) ;
696700 }
701+
702+ #[ tokio:: test]
703+ async fn crash_on_flush ( ) {
704+
705+ #[ derive( Clone , Default ) ]
706+ struct SyncFailBufferIo {
707+ inner : Arc < Mutex < HashMap < PathBuf , Arc < Mutex < Vec < u8 > > > > > >
708+
709+ }
710+
711+ struct File {
712+ path : PathBuf ,
713+ io : SyncFailBufferIo ,
714+ }
715+
716+ impl File {
717+ fn inner ( & self ) -> Arc < Mutex < Vec < u8 > > > {
718+ self . io . inner . lock ( ) . get ( & self . path ) . cloned ( ) . unwrap ( )
719+ }
720+ }
721+
722+ impl FileExt for File {
723+ fn len ( & self ) -> std:: io:: Result < u64 > {
724+ Ok ( self . inner ( ) . lock ( ) . len ( ) as u64 )
725+ }
726+
727+ fn write_at_vectored ( & self , bufs : & [ IoSlice ] , offset : u64 ) -> std:: io:: Result < usize > {
728+ let mut written = 0 ;
729+ for buf in bufs {
730+ self . write_at ( buf. as_bytes ( ) , written + offset) ?;
731+ written += buf. len ( ) as u64 ;
732+ }
733+ Ok ( written as _ )
734+ }
735+
736+ fn write_at ( & self , buf : & [ u8 ] , offset : u64 ) -> std:: io:: Result < usize > {
737+ let data = self . inner ( ) ;
738+ let mut data = data. lock ( ) ;
739+ let new_len = offset as usize + buf. len ( ) ;
740+ let old_len = data. len ( ) ;
741+ if old_len < new_len {
742+ data. extend ( std:: iter:: repeat ( 0 ) . take ( new_len - old_len) ) ;
743+ }
744+ data[ offset as usize ..offset as usize + buf. len ( ) ] . copy_from_slice ( buf) ;
745+ Ok ( buf. len ( ) )
746+ }
747+
748+ fn read_at ( & self , buf : & mut [ u8 ] , offset : u64 ) -> std:: io:: Result < usize > {
749+ let inner = self . inner ( ) ;
750+ let inner = inner. lock ( ) ;
751+ if offset >= inner. len ( ) as u64 {
752+ return Ok ( 0 )
753+ }
754+
755+ let read_len = if offset as usize + buf. len ( ) > inner. len ( ) {
756+ offset as usize + buf. len ( ) - inner. len ( )
757+ } else {
758+ buf. len ( )
759+ } ;
760+ buf[ ..read_len] . copy_from_slice ( & inner[ offset as usize ..offset as usize + read_len] ) ;
761+ Ok ( read_len)
762+ }
763+
764+ fn sync_all ( & self ) -> std:: io:: Result < ( ) > {
765+ // simulate a flush that only flushes half the pages and then fail
766+ let inner = self . inner ( ) ;
767+ let inner = inner. lock ( ) ;
768+ let npages = inner. len ( ) / 4096 ;
769+ std:: fs:: write ( & self . path , & inner[ ..4096 * ( npages / 2 ) ] ) ?;
770+ Err ( io:: Error :: new ( io:: ErrorKind :: BrokenPipe , "" ) )
771+ }
772+
773+ fn set_len ( & self , _len : u64 ) -> std:: io:: Result < ( ) > {
774+ todo ! ( )
775+ }
776+
777+ async fn read_exact_at_async < B : IoBufMut + Send + ' static > (
778+ & self ,
779+ mut buf : B ,
780+ offset : u64 ,
781+ ) -> ( B , std:: io:: Result < ( ) > ) {
782+ let slice = unsafe { std:: slice:: from_raw_parts_mut ( buf. stable_mut_ptr ( ) , buf. bytes_total ( ) ) } ;
783+ let ret = self . read_at ( slice, offset) ;
784+ ( buf, ret. map ( |_| ( ) ) )
785+
786+ }
787+
788+ async fn read_at_async < B : IoBufMut + Send + ' static > (
789+ & self ,
790+ _buf : B ,
791+ _offset : u64 ,
792+ ) -> ( B , std:: io:: Result < usize > ) {
793+ todo ! ( )
794+ }
795+
796+ async fn write_all_at_async < B : crate :: io:: buf:: IoBuf + Send + ' static > (
797+ & self ,
798+ _buf : B ,
799+ _offset : u64 ,
800+ ) -> ( B , std:: io:: Result < ( ) > ) {
801+ todo ! ( )
802+ }
803+ }
804+
805+ impl Io for SyncFailBufferIo {
806+ type File = File ;
807+
808+ type TempFile = File ;
809+
810+ fn create_dir_all ( & self , path : & std:: path:: Path ) -> std:: io:: Result < ( ) > {
811+ std:: fs:: create_dir_all ( path)
812+ }
813+
814+ fn open (
815+ & self ,
816+ _create_new : bool ,
817+ _read : bool ,
818+ _write : bool ,
819+ path : & std:: path:: Path ,
820+ ) -> std:: io:: Result < Self :: File > {
821+ let mut inner = self . inner . lock ( ) ;
822+ if !inner. contains_key ( path) {
823+ let data = if path. exists ( ) {
824+ std:: fs:: read ( path) . map_err ( |e| dbg ! ( e) ) ?
825+ } else {
826+ vec ! [ ]
827+ } ;
828+ inner. insert ( path. to_owned ( ) , Arc :: new ( Mutex :: new ( data) ) ) ;
829+ }
830+
831+ Ok ( File {
832+ path : path. into ( ) ,
833+ io : self . clone ( ) ,
834+ } )
835+
836+ }
837+
838+ fn tempfile ( & self ) -> std:: io:: Result < Self :: TempFile > {
839+ todo ! ( )
840+ }
841+
842+ fn now ( & self ) -> DateTime < Utc > {
843+ Utc :: now ( )
844+ }
845+
846+ fn uuid ( & self ) -> uuid:: Uuid {
847+ Uuid :: new_v4 ( )
848+ }
849+
850+ fn hard_link ( & self , _src : & std:: path:: Path , _dst : & std:: path:: Path ) -> std:: io:: Result < ( ) > {
851+ todo ! ( )
852+ }
853+ }
854+
855+ let tmp = Arc :: new ( tempdir ( ) . unwrap ( ) ) ;
856+ {
857+ let env = TestEnv :: new_io_and_tmp ( SyncFailBufferIo :: default ( ) , tmp. clone ( ) ) ;
858+ let conn = env. open_conn ( "test" ) ;
859+ let shared = env. shared ( "test" ) ;
860+
861+ conn. execute ( "create table test (x)" , ( ) ) . unwrap ( ) ;
862+ conn. execute ( "insert into test values (1234)" , ( ) ) . unwrap ( ) ;
863+ conn. execute ( "insert into test values (1234)" , ( ) ) . unwrap ( ) ;
864+ conn. execute ( "insert into test values (1234)" , ( ) ) . unwrap ( ) ;
865+
866+ // trigger a flush, that will fail. When we reopen the db, the log should need recovery
867+ // this simulates a crash before flush
868+ {
869+ let mut tx = shared. begin_read ( 99999 ) . into ( ) ;
870+ shared. upgrade ( & mut tx) . unwrap ( ) ;
871+ let mut guard = tx. as_write_mut ( ) . unwrap ( ) . lock ( ) ;
872+ guard. commit ( ) ;
873+ let _ = shared. swap_current ( & mut guard) ;
874+ }
875+ }
876+
877+ {
878+ let env = TestEnv :: new_io_and_tmp ( SyncFailBufferIo :: default ( ) , tmp. clone ( ) ) ;
879+ let conn = env. open_conn ( "test" ) ;
880+ conn. query_row ( "select count(*) from test" , ( ) , |row| {
881+ dbg ! ( row) ;
882+ Ok ( ( ) )
883+ } ) . unwrap ( ) ;
884+ }
885+ }
697886}
0 commit comments