@@ -61,16 +61,23 @@ impl SyncError {
6161 }
6262}
6363
64+ pub enum PullResult {
65+ /// A frame was successfully pulled.
66+ Frame ( Bytes ) ,
67+ /// We've reached the end of the generation.
68+ EndOfGeneration { max_generation : u32 } ,
69+ }
70+
6471pub struct SyncContext {
6572 db_path : String ,
6673 client : hyper:: Client < ConnectorService , Body > ,
6774 sync_url : String ,
6875 auth_token : Option < HeaderValue > ,
6976 max_retries : usize ,
77+ /// The current durable generation.
78+ durable_generation : u32 ,
7079 /// Represents the max_frame_no from the server.
7180 durable_frame_num : u32 ,
72- /// Represents the current checkpoint generation.
73- generation : u32 ,
7481}
7582
7683impl SyncContext {
@@ -96,8 +103,8 @@ impl SyncContext {
96103 auth_token,
97104 max_retries : DEFAULT_MAX_RETRIES ,
98105 client,
106+ durable_generation : 1 ,
99107 durable_frame_num : 0 ,
100- generation : 1 ,
101108 } ;
102109
103110 if let Err ( e) = me. read_metadata ( ) . await {
@@ -115,7 +122,7 @@ impl SyncContext {
115122 & mut self ,
116123 generation : u32 ,
117124 frame_no : u32 ,
118- ) -> Result < Option < Bytes > > {
125+ ) -> Result < PullResult > {
119126 let uri = format ! (
120127 "{}/sync/{}/{}/{}" ,
121128 self . sync_url,
@@ -124,13 +131,7 @@ impl SyncContext {
124131 frame_no + 1
125132 ) ;
126133 tracing:: debug!( "pulling frame" ) ;
127- match self . pull_with_retry ( uri, self . max_retries ) . await ? {
128- Some ( frame) => {
129- self . durable_frame_num = frame_no;
130- Ok ( Some ( frame) )
131- }
132- None => Ok ( None ) ,
133- }
134+ self . pull_with_retry ( uri, self . max_retries ) . await
134135 }
135136
136137 #[ tracing:: instrument( skip( self , frame) ) ]
@@ -149,7 +150,7 @@ impl SyncContext {
149150 ) ;
150151 tracing:: debug!( "pushing frame" ) ;
151152
152- let durable_frame_num = self . push_with_retry ( uri, frame, self . max_retries ) . await ?;
153+ let ( generation , durable_frame_num) = self . push_with_retry ( uri, frame, self . max_retries ) . await ?;
153154
154155 if durable_frame_num > frame_no {
155156 tracing:: error!(
@@ -178,12 +179,14 @@ impl SyncContext {
178179 tracing:: debug!( ?durable_frame_num, "frame successfully pushed" ) ;
179180
180181 // Update our last known max_frame_no from the server.
182+ tracing:: debug!( ?generation, ?durable_frame_num, "updating remote generation and durable_frame_num" ) ;
183+ self . durable_generation = generation;
181184 self . durable_frame_num = durable_frame_num;
182185
183186 Ok ( durable_frame_num)
184187 }
185188
186- async fn push_with_retry ( & self , uri : String , frame : Bytes , max_retries : usize ) -> Result < u32 > {
189+ async fn push_with_retry ( & self , uri : String , frame : Bytes , max_retries : usize ) -> Result < ( u32 , u32 ) > {
187190 let mut nr_retries = 0 ;
188191 loop {
189192 let mut req = http:: Request :: post ( uri. clone ( ) ) ;
@@ -213,6 +216,14 @@ impl SyncContext {
213216 let resp = serde_json:: from_slice :: < serde_json:: Value > ( & res_body[ ..] )
214217 . map_err ( SyncError :: JsonDecode ) ?;
215218
219+ let generation = resp
220+ . get ( "generation" )
221+ . ok_or_else ( || SyncError :: JsonValue ( resp. clone ( ) ) ) ?;
222+
223+ let generation = generation
224+ . as_u64 ( )
225+ . ok_or_else ( || SyncError :: JsonValue ( generation. clone ( ) ) ) ?;
226+
216227 let max_frame_no = resp
217228 . get ( "max_frame_no" )
218229 . ok_or_else ( || SyncError :: JsonValue ( resp. clone ( ) ) ) ?;
@@ -221,7 +232,7 @@ impl SyncContext {
221232 . as_u64 ( )
222233 . ok_or_else ( || SyncError :: JsonValue ( max_frame_no. clone ( ) ) ) ?;
223234
224- return Ok ( max_frame_no as u32 ) ;
235+ return Ok ( ( generation as u32 , max_frame_no as u32 ) ) ;
225236 }
226237
227238 // If we've retried too many times or the error is not a server error,
@@ -244,7 +255,7 @@ impl SyncContext {
244255 }
245256 }
246257
247- async fn pull_with_retry ( & self , uri : String , max_retries : usize ) -> Result < Option < Bytes > > {
258+ async fn pull_with_retry ( & self , uri : String , max_retries : usize ) -> Result < PullResult > {
248259 let mut nr_retries = 0 ;
249260 loop {
250261 let mut req = http:: Request :: builder ( ) . method ( "GET" ) . uri ( uri. clone ( ) ) ;
@@ -268,10 +279,27 @@ impl SyncContext {
268279 let frame = hyper:: body:: to_bytes ( res. into_body ( ) )
269280 . await
270281 . map_err ( SyncError :: HttpBody ) ?;
271- return Ok ( Some ( frame) ) ;
282+ return Ok ( PullResult :: Frame ( frame) ) ;
283+ }
284+ if res. status ( ) == StatusCode :: BAD_REQUEST {
285+ let res_body = hyper:: body:: to_bytes ( res. into_body ( ) )
286+ . await
287+ . map_err ( SyncError :: HttpBody ) ?;
288+
289+ let resp = serde_json:: from_slice :: < serde_json:: Value > ( & res_body[ ..] )
290+ . map_err ( SyncError :: JsonDecode ) ?;
291+
292+ let generation = resp
293+ . get ( "generation" )
294+ . ok_or_else ( || SyncError :: JsonValue ( resp. clone ( ) ) ) ?;
295+
296+ let generation = generation
297+ . as_u64 ( )
298+ . ok_or_else ( || SyncError :: JsonValue ( generation. clone ( ) ) ) ?;
299+ return Ok ( PullResult :: EndOfGeneration { max_generation : generation as u32 } ) ;
272300 }
273301 if res. status ( ) == StatusCode :: BAD_REQUEST {
274- return Ok ( None ) ;
302+ return Err ( SyncError :: PullFrame ( res . status ( ) , "Bad Request" . to_string ( ) ) . into ( ) ) ;
275303 }
276304 // If we've retried too many times or the error is not a server error,
277305 // return the error.
@@ -293,12 +321,18 @@ impl SyncContext {
293321 }
294322 }
295323
324+
325+ pub ( crate ) fn next_generation ( & mut self ) {
326+ self . durable_generation += 1 ;
327+ self . durable_frame_num = 0 ;
328+ }
329+
296330 pub ( crate ) fn durable_frame_num ( & self ) -> u32 {
297331 self . durable_frame_num
298332 }
299333
300- pub ( crate ) fn generation ( & self ) -> u32 {
301- self . generation
334+ pub ( crate ) fn durable_generation ( & self ) -> u32 {
335+ self . durable_generation
302336 }
303337
304338 pub ( crate ) async fn write_metadata ( & mut self ) -> Result < ( ) > {
@@ -308,7 +342,7 @@ impl SyncContext {
308342 hash : 0 ,
309343 version : METADATA_VERSION ,
310344 durable_frame_num : self . durable_frame_num ,
311- generation : self . generation ,
345+ generation : self . durable_generation ,
312346 } ;
313347
314348 metadata. set_hash ( ) ;
@@ -350,8 +384,8 @@ impl SyncContext {
350384 metadata
351385 ) ;
352386
387+ self . durable_generation = metadata. generation ;
353388 self . durable_frame_num = metadata. durable_frame_num ;
354- self . generation = metadata. generation ;
355389
356390 Ok ( ( ) )
357391 }
@@ -436,10 +470,7 @@ pub async fn sync_offline(
436470 sync_ctx : & mut SyncContext ,
437471 conn : & Connection ,
438472) -> Result < crate :: database:: Replicated > {
439- let durable_frame_no = sync_ctx. durable_frame_num ( ) ;
440- let max_frame_no = conn. wal_frame_count ( ) ;
441-
442- if max_frame_no > durable_frame_no {
473+ if is_ahead_of_remote ( & sync_ctx, & conn) {
443474 match try_push ( sync_ctx, conn) . await {
444475 Ok ( rep) => Ok ( rep) ,
445476 Err ( Error :: Sync ( err) ) => {
@@ -475,6 +506,11 @@ pub async fn sync_offline(
475506 } )
476507}
477508
509+ fn is_ahead_of_remote ( sync_ctx : & SyncContext , conn : & Connection ) -> bool {
510+ let max_local_frame = conn. wal_frame_count ( ) ;
511+ max_local_frame > sync_ctx. durable_frame_num ( )
512+ }
513+
478514async fn try_push (
479515 sync_ctx : & mut SyncContext ,
480516 conn : & Connection ,
@@ -496,7 +532,7 @@ async fn try_push(
496532 } ) ;
497533 }
498534
499- let generation = sync_ctx. generation ( ) ; // TODO: Probe from WAL.
535+ let generation = sync_ctx. durable_generation ( ) ;
500536 let start_frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
501537 let end_frame_no = max_frame_no;
502538
@@ -532,29 +568,60 @@ async fn try_pull(
532568 sync_ctx : & mut SyncContext ,
533569 conn : & Connection ,
534570) -> Result < crate :: database:: Replicated > {
535- let generation = sync_ctx. generation ( ) ;
536- let mut frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
537-
538571 let insert_handle = conn. wal_insert_handle ( ) ?;
539572
573+ let mut err = None ;
574+
540575 loop {
576+ let generation = sync_ctx. durable_generation ( ) ;
577+ let frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
541578 match sync_ctx. pull_one_frame ( generation, frame_no) . await {
542- Ok ( Some ( frame) ) => {
579+ Ok ( PullResult :: Frame ( frame) ) => {
543580 insert_handle. insert ( & frame) ?;
544- frame_no += 1 ;
581+ sync_ctx . durable_frame_num = frame_no ;
545582 }
546- Ok ( None ) => {
583+ Ok ( PullResult :: EndOfGeneration { max_generation } ) => {
584+ // If there are no more generations to pull, we're done.
585+ if generation >= max_generation {
586+ break ;
587+ }
588+ insert_handle. end ( ) ?;
547589 sync_ctx. write_metadata ( ) . await ?;
548- return Ok ( crate :: database:: Replicated {
549- frame_no : None ,
550- frames_synced : 1 ,
551- } ) ;
552- }
553- Err ( err) => {
554- tracing:: debug!( "pull_one_frame error: {:?}" , err) ;
590+
591+ // TODO: Make this crash-proof.
592+ conn. wal_checkpoint ( true ) ?;
593+
594+ sync_ctx. next_generation ( ) ;
555595 sync_ctx. write_metadata ( ) . await ?;
556- return Err ( err) ;
596+
597+ insert_handle. begin ( ) ?;
598+ }
599+ Err ( e) => {
600+ tracing:: debug!( "pull_one_frame error: {:?}" , e) ;
601+ err. replace ( e) ;
602+ break ;
557603 }
558604 }
559605 }
606+ // This is crash-proof because we:
607+ //
608+ // 1. Write WAL frame first
609+ // 2. Write new max frame to temporary metadata
610+ // 3. Atomically rename the temporary metadata to the real metadata
611+ //
612+ // If we crash before metadata rename completes, the old metadata still
613+ // points to last successful frame, allowing safe retry from that point.
614+ // If we happen to have the frame already in the WAL, it's fine to re-pull
615+ // because append locally is idempotent.
616+ insert_handle. end ( ) ?;
617+ sync_ctx. write_metadata ( ) . await ?;
618+
619+ if let Some ( err) = err {
620+ Err ( err)
621+ } else {
622+ Ok ( crate :: database:: Replicated {
623+ frame_no : None ,
624+ frames_synced : 1 ,
625+ } )
626+ }
560627}
0 commit comments