@@ -2,32 +2,45 @@ use crate::{util::ConnectorService, Result};
22use bytes:: Bytes ;
33use hyper:: Body ;
44
5+ const METADATA_VERSION : u32 = 0 ;
6+
57const DEFAULT_MAX_RETRIES : usize = 5 ;
68
79pub struct SyncContext {
10+ db_path : String ,
811 sync_url : String ,
912 auth_token : Option < String > ,
1013 max_retries : usize ,
14+ /// Represents the max_frame_no from the server.
1115 durable_frame_num : u32 ,
1216 client : hyper:: Client < ConnectorService , Body > ,
1317}
1418
1519impl SyncContext {
16- pub fn new ( connector : ConnectorService , sync_url : String , auth_token : Option < String > ) -> Self {
17- // TODO(lucio): add custom connector + tls support here
20+ pub async fn new (
21+ connector : ConnectorService ,
22+ db_path : String ,
23+ sync_url : String ,
24+ auth_token : Option < String > ,
25+ ) -> Result < Self > {
1826 let client = hyper:: client:: Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
1927
20- Self {
28+ let mut me = Self {
29+ db_path,
2130 sync_url,
2231 auth_token,
2332 durable_frame_num : 0 ,
2433 max_retries : DEFAULT_MAX_RETRIES ,
2534 client,
26- }
35+ } ;
36+
37+ me. read_metadata ( ) . await ?;
38+
39+ Ok ( me)
2740 }
2841
2942 pub ( crate ) async fn push_one_frame (
30- & self ,
43+ & mut self ,
3144 frame : Bytes ,
3245 generation : u32 ,
3346 frame_no : u32 ,
@@ -41,6 +54,11 @@ impl SyncContext {
4154 ) ;
4255 let max_frame_no = self . push_with_retry ( uri, frame, self . max_retries ) . await ?;
4356
57+ // Update our last known max_frame_no from the server.
58+ self . durable_frame_num = max_frame_no;
59+
60+ self . write_metadata ( ) . await ?;
61+
4462 Ok ( max_frame_no)
4563 }
4664
@@ -93,4 +111,41 @@ impl SyncContext {
93111 pub ( crate ) fn durable_frame_num ( & self ) -> u32 {
94112 self . durable_frame_num
95113 }
114+
115+ async fn write_metadata ( & mut self ) -> Result < ( ) > {
116+ let path = format ! ( "{}-info" , self . db_path) ;
117+
118+ let contents = serde_json:: to_vec ( & MetadataJson {
119+ version : METADATA_VERSION ,
120+ durable_frame_num : self . durable_frame_num ,
121+ } )
122+ . unwrap ( ) ;
123+
124+ tokio:: fs:: write ( path, contents) . await . unwrap ( ) ;
125+
126+ Ok ( ( ) )
127+ }
128+
129+ async fn read_metadata ( & mut self ) -> Result < ( ) > {
130+ let path = format ! ( "{}-info" , self . db_path) ;
131+
132+ let contents = tokio:: fs:: read ( & path) . await . unwrap ( ) ;
133+
134+ let metadata = serde_json:: from_slice :: < MetadataJson > ( & contents[ ..] ) . unwrap ( ) ;
135+
136+ assert_eq ! (
137+ metadata. version, METADATA_VERSION ,
138+ "Reading metadata from a different version than expected"
139+ ) ;
140+
141+ self . durable_frame_num = metadata. durable_frame_num ;
142+
143+ Ok ( ( ) )
144+ }
145+ }
146+
147+ #[ derive( serde:: Serialize , serde:: Deserialize ) ]
148+ struct MetadataJson {
149+ version : u32 ,
150+ durable_frame_num : u32 ,
96151}
0 commit comments