@@ -67,6 +67,8 @@ impl Builder<()> {
6767 http_request_callback: None ,
6868 namespace: None ,
6969 skip_safety_assert: false ,
70+ #[ cfg( feature = "sync" ) ]
71+ sync_protocol: Default :: default ( ) ,
7072 } ,
7173 }
7274 }
@@ -222,6 +224,8 @@ cfg_replication! {
222224 http_request_callback: Option <crate :: util:: HttpRequestCallback >,
223225 namespace: Option <String >,
224226 skip_safety_assert: bool ,
227+ #[ cfg( feature = "sync" ) ]
228+ sync_protocol: super :: SyncProtocol ,
225229 }
226230
227231 /// Local replica configuration type in [`Builder`].
@@ -274,6 +278,15 @@ cfg_replication! {
274278 self
275279 }
276280
281+ /// Set the duration at which the replicator will automatically call `sync` in the
282+ /// background. The sync will continue for the duration that the resulted `Database`
283+ /// type is alive for, once it is dropped the background task will get dropped and stop.
284+ #[ cfg( feature = "sync" ) ]
285+ pub fn sync_protocol( mut self , protocol: super :: SyncProtocol ) -> Builder <RemoteReplica > {
286+ self . inner. sync_protocol = protocol;
287+ self
288+ }
289+
277290 pub fn http_request_callback<F >( mut self , f: F ) -> Builder <RemoteReplica >
278291 where
279292 F : Fn ( & mut http:: Request <( ) >) + Send + Sync + ' static
@@ -326,7 +339,9 @@ cfg_replication! {
326339 sync_interval,
327340 http_request_callback,
328341 namespace,
329- skip_safety_assert
342+ skip_safety_assert,
343+ #[ cfg( feature = "sync" ) ]
344+ sync_protocol,
330345 } = self . inner;
331346
332347 let connector = if let Some ( connector) = connector {
@@ -342,6 +357,48 @@ cfg_replication! {
342357 crate :: util:: ConnectorService :: new( svc)
343358 } ;
344359
360+ #[ cfg( feature = "sync" ) ]
361+ {
362+ use super :: SyncProtocol ;
363+ match sync_protocol {
364+ p @ ( SyncProtocol :: Auto | SyncProtocol :: V2 ) => {
365+ let client = hyper:: client:: Client :: builder( )
366+ . build:: <_, hyper:: Body >( connector. clone( ) ) ;
367+
368+ let req = http:: Request :: get( format!( "{url}/sync/0/0/0" ) )
369+ . header( "Authorization" , format!( "Bearer {}" , auth_token) )
370+ . body( hyper:: Body :: empty( ) )
371+ . unwrap( ) ;
372+
373+ let res = client
374+ . request( req)
375+ . await
376+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
377+
378+ if matches!( p, SyncProtocol :: V2 ) {
379+ if !res. status( ) . is_success( ) {
380+ let status = res. status( ) ;
381+ let body_bytes = hyper:: body:: to_bytes( res. into_body( ) )
382+ . await
383+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
384+ let error_message = String :: from_utf8_lossy( & body_bytes) ;
385+ return Err ( crate :: Error :: Sync ( format!( "HTTP error {}: {}" , status, error_message) . into( ) ) ) ;
386+ }
387+ }
388+
389+ if res. status( ) . is_success( ) {
390+ return Builder :: new_synced_database( path, url, auth_token)
391+ . remote_writes( true )
392+ . read_your_writes( read_your_writes)
393+ . build( )
394+ . await ;
395+ }
396+
397+ }
398+ SyncProtocol :: V1 => { }
399+ }
400+ }
401+
345402 let path = path. to_str( ) . ok_or( crate :: Error :: InvalidUTF8Path ) ?. to_owned( ) ;
346403
347404 let db = if !skip_safety_assert {
0 commit comments