@@ -67,6 +67,7 @@ impl Builder<()> {
6767 http_request_callback: None ,
6868 namespace: None ,
6969 skip_safety_assert: false ,
70+ sync_protocol: Default :: default ( ) ,
7071 } ,
7172 }
7273 }
@@ -222,6 +223,7 @@ cfg_replication! {
222223 http_request_callback: Option <crate :: util:: HttpRequestCallback >,
223224 namespace: Option <String >,
224225 skip_safety_assert: bool ,
226+ sync_protocol: super :: SyncProtocol ,
225227 }
226228
227229 /// Local replica configuration type in [`Builder`].
@@ -274,6 +276,14 @@ cfg_replication! {
274276 self
275277 }
276278
279+ /// Set the duration at which the replicator will automatically call `sync` in the
280+ /// background. The sync will continue for the duration that the resulted `Database`
281+ /// type is alive for, once it is dropped the background task will get dropped and stop.
282+ pub fn sync_protocol( mut self , protocol: super :: SyncProtocol ) -> Builder <RemoteReplica > {
283+ self . inner. sync_protocol = protocol;
284+ self
285+ }
286+
277287 pub fn http_request_callback<F >( mut self , f: F ) -> Builder <RemoteReplica >
278288 where
279289 F : Fn ( & mut http:: Request <( ) >) + Send + Sync + ' static
@@ -326,7 +336,8 @@ cfg_replication! {
326336 sync_interval,
327337 http_request_callback,
328338 namespace,
329- skip_safety_assert
339+ skip_safety_assert,
340+ sync_protocol,
330341 } = self . inner;
331342
332343 let connector = if let Some ( connector) = connector {
@@ -342,26 +353,44 @@ cfg_replication! {
342353 crate :: util:: ConnectorService :: new( svc)
343354 } ;
344355
345- let client = hyper:: client:: Client :: builder( )
346- . build:: <_, hyper:: Body >( connector. clone( ) ) ;
347-
348- let req = http:: Request :: get( format!( "{url}/sync/0/0/0" ) )
349- . header( "Authorization" , format!( "Bearer {}" , auth_token) )
350- . body( hyper:: Body :: empty( ) )
351- . unwrap( ) ;
352-
353- let res = client
354- . request( req)
355- . await
356- . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
357-
358- match res. status( ) {
359- hyper:: StatusCode :: OK => return Builder :: new_synced_database( path, url, auth_token)
360- . remote_writes( true )
361- . read_your_writes( read_your_writes)
362- . build( )
363- . await ,
364- _ => { }
356+ use super :: SyncProtocol ;
357+
358+ match sync_protocol {
359+ p @ ( SyncProtocol :: Auto | SyncProtocol :: V2 ) => {
360+ let client = hyper:: client:: Client :: builder( )
361+ . build:: <_, hyper:: Body >( connector. clone( ) ) ;
362+
363+ let req = http:: Request :: get( format!( "{url}/sync/0/0/0" ) )
364+ . header( "Authorization" , format!( "Bearer {}" , auth_token) )
365+ . body( hyper:: Body :: empty( ) )
366+ . unwrap( ) ;
367+
368+ let res = client
369+ . request( req)
370+ . await
371+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
372+
373+ if matches!( p, SyncProtocol :: V2 ) {
374+ if !res. status( ) . is_success( ) {
375+ let status = res. status( ) ;
376+ let body_bytes = hyper:: body:: to_bytes( res. into_body( ) )
377+ . await
378+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
379+ let error_message = String :: from_utf8_lossy( & body_bytes) ;
380+ return Err ( crate :: Error :: Sync ( format!( "HTTP error {}: {}" , status, error_message) . into( ) ) ) ;
381+ }
382+ }
383+
384+ if res. status( ) . is_success( ) {
385+ return Builder :: new_synced_database( path, url, auth_token)
386+ . remote_writes( true )
387+ . read_your_writes( read_your_writes)
388+ . build( )
389+ . await ;
390+ }
391+
392+ }
393+ SyncProtocol :: V1 => { }
365394 }
366395
367396 let path = path. to_str( ) . ok_or( crate :: Error :: InvalidUTF8Path ) ?. to_owned( ) ;
0 commit comments