@@ -11,6 +11,19 @@ use crate::registry::WalRegistry;
1111
1212pub ( crate ) type NotifyCheckpointer = mpsc:: Sender < NamespaceName > ;
1313
14+ pub enum CheckpointMessage {
15+ /// notify that a namespace may be checkpointable
16+ Namespace ( NamespaceName ) ,
17+ /// shutdown initiated
18+ Shutdown ,
19+ }
20+
21+ impl From < NamespaceName > for CheckpointMessage {
22+ fn from ( value : NamespaceName ) -> Self {
23+ Self :: Namespace ( value)
24+ }
25+ }
26+
1427pub type LibsqlCheckpointer < IO , S > = Checkpointer < WalRegistry < IO , S > > ;
1528
1629impl < IO , S > LibsqlCheckpointer < IO , S >
2033{
2134 pub fn new (
2235 registry : Arc < WalRegistry < IO , S > > ,
23- notifier : mpsc:: Receiver < NamespaceName > ,
36+ notifier : mpsc:: Receiver < CheckpointMessage > ,
2437 max_checkpointing_conccurency : usize ,
2538 ) -> Self {
2639 Self :: new_with_performer ( registry, notifier, max_checkpointing_conccurency)
@@ -70,7 +83,7 @@ pub struct Checkpointer<P> {
7083 checkpointing : HashSet < NamespaceName > ,
7184 /// the checkpointer is notifier whenever there is a change to a namespage that could trigger a
7285 /// checkpoint
73- recv : mpsc:: Receiver < NamespaceName > ,
86+ recv : mpsc:: Receiver < CheckpointMessage > ,
7487 max_checkpointing_conccurency : usize ,
7588 shutting_down : bool ,
7689 join_set : JoinSet < ( NamespaceName , crate :: error:: Result < ( ) > ) > ,
8598{
8699 fn new_with_performer (
87100 perform_checkpoint : Arc < P > ,
88- notifier : mpsc:: Receiver < NamespaceName > ,
101+ notifier : mpsc:: Receiver < CheckpointMessage > ,
89102 max_checkpointing_conccurency : usize ,
90103 ) -> Self {
91104 Self {
@@ -141,10 +154,10 @@ where
141154 }
142155 notified = self . recv. recv( ) , if !self . shutting_down => {
143156 match notified {
144- Some ( namespace) => {
157+ Some ( CheckpointMessage :: Namespace ( namespace) ) => {
145158 self . scheduled. insert( namespace) ;
146159 }
147- None => {
160+ None | Some ( CheckpointMessage :: Shutdown ) => {
148161 self . shutting_down = true ;
149162 }
150163 }
@@ -204,7 +217,7 @@ mod test {
204217 let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
205218 let ns = NamespaceName :: from ( "test" ) ;
206219
207- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
220+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
208221
209222 checkpointer. step ( ) . await ;
210223
@@ -236,7 +249,7 @@ mod test {
236249 let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
237250 let ns = NamespaceName :: from ( "test" ) ;
238251
239- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
252+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
240253
241254 checkpointer. step ( ) . await ;
242255 assert_eq ! ( checkpointer. errors, 0 ) ;
@@ -327,8 +340,8 @@ mod test {
327340
328341 let ns: NamespaceName = "test" . into ( ) ;
329342
330- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
331- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
343+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
344+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
332345
333346 checkpointer. step ( ) . await ;
334347
@@ -360,8 +373,8 @@ mod test {
360373 let ns1: NamespaceName = "test1" . into ( ) ;
361374 let ns2: NamespaceName = "test2" . into ( ) ;
362375
363- sender. send ( ns1. clone ( ) ) . await . unwrap ( ) ;
364- sender. send ( ns2. clone ( ) ) . await . unwrap ( ) ;
376+ sender. send ( ns1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
377+ sender. send ( ns2. clone ( ) . into ( ) ) . await . unwrap ( ) ;
365378
366379 checkpointer. step ( ) . await ;
367380
@@ -396,9 +409,9 @@ mod test {
396409 let ns2: NamespaceName = "test2" . into ( ) ;
397410 let ns3: NamespaceName = "test3" . into ( ) ;
398411
399- sender. send ( ns1. clone ( ) ) . await . unwrap ( ) ;
400- sender. send ( ns2. clone ( ) ) . await . unwrap ( ) ;
401- sender. send ( ns3. clone ( ) ) . await . unwrap ( ) ;
412+ sender. send ( ns1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
413+ sender. send ( ns2. clone ( ) . into ( ) ) . await . unwrap ( ) ;
414+ sender. send ( ns3. clone ( ) . into ( ) ) . await . unwrap ( ) ;
402415
403416 checkpointer. step ( ) . await ;
404417 checkpointer. step ( ) . await ;
0 commit comments