@@ -4,11 +4,13 @@ use roaring::RoaringBitmap;
44use tokio:: sync:: watch;
55use tokio_stream:: { Stream , StreamExt } ;
66
7- use crate :: error:: Result ;
87use crate :: io:: Io ;
8+ use crate :: replication:: Error ;
99use crate :: segment:: Frame ;
1010use crate :: shared_wal:: SharedWal ;
1111
12+ use super :: Result ;
13+
1214pub struct Replicator < IO : Io > {
1315 shared : Arc < SharedWal < IO > > ,
1416 new_frame_notifier : watch:: Receiver < u64 > ,
@@ -38,7 +40,7 @@ impl<IO: Io> Replicator<IO> {
3840 ///
3941 /// In a single replication step, the replicator guarantees that a minimal set of frames is
4042 /// sent to the replica.
41- pub fn frame_stream ( & mut self ) -> impl Stream < Item = Result < Frame > > + ' _ {
43+ pub fn frame_stream ( & mut self ) -> impl Stream < Item = Result < Box < Frame > > > + ' _ {
4244 async_stream:: try_stream! {
4345 loop {
4446 let most_recent_frame_no = * self
@@ -54,6 +56,7 @@ impl<IO: Io> Replicator<IO> {
5456 let ( stream, replicated_until, size_after) = current. frame_stream_from( self . next_frame_no, & mut seen) ;
5557 let should_replicate_from_tail = replicated_until != self . next_frame_no;
5658
59+
5760 // replicate from current
5861 {
5962 tokio:: pin!( stream) ;
@@ -62,7 +65,7 @@ impl<IO: Io> Replicator<IO> {
6265
6366 loop {
6467 let Some ( frame) = stream. next( ) . await else { break } ;
65- let mut frame = frame?;
68+ let mut frame = frame. map_err ( |e| Error :: CurrentSegment ( e . into ( ) ) ) ?;
6669 commit_frame_no = frame. header( ) . frame_no( ) . max( commit_frame_no) ;
6770 if stream. peek( ) . await . is_none( ) && !should_replicate_from_tail {
6871 frame. header_mut( ) . set_size_after( size_after) ;
0 commit comments