11use std:: cmp:: Reverse ;
22use std:: collections:: { HashMap , VecDeque } ;
33
4- use tokio:: sync:: mpsc;
5-
64use super :: job:: { IndexedRequest , Job , JobResult } ;
75use super :: StoreSegmentRequest ;
86use libsql_sys:: name:: NamespaceName ;
@@ -32,16 +30,14 @@ impl<C, F> Default for NamespaceRequests<C, F> {
3230/// It is generic over C: the storage config type (for config overrides), and T, the segment type
3331pub ( crate ) struct Scheduler < C , T > {
3432 /// notify new durability index for namespace
35- durable_notifier : mpsc:: Sender < ( NamespaceName , u64 ) > ,
3633 requests : HashMap < NamespaceName , NamespaceRequests < C , T > > ,
3734 queue : priority_queue:: PriorityQueue < NamespaceName , Reverse < u64 > > ,
3835 next_request_id : u64 ,
3936}
4037
4138impl < C , T > Scheduler < C , T > {
42- pub fn new ( durable_notifier : mpsc :: Sender < ( NamespaceName , u64 ) > ) -> Self {
39+ pub fn new ( ) -> Self {
4340 Self {
44- durable_notifier,
4541 requests : Default :: default ( ) ,
4642 queue : Default :: default ( ) ,
4743 next_request_id : Default :: default ( ) ,
@@ -108,14 +104,6 @@ impl<C, T> Scheduler<C, T> {
108104 Ok ( durable_index) => {
109105 tracing:: debug!( "job success registered" ) ;
110106 ( result. job . request . request . on_store_callback ) ( durable_index) . await ;
111- if self
112- . durable_notifier
113- . send ( ( name. clone ( ) , durable_index) )
114- . await
115- . is_err ( )
116- {
117- tracing:: warn!( "durability notifier was closed, proceeding anyway" ) ;
118- }
119107 }
120108 Err ( e) => {
121109 tracing:: error!( "error processing request, re-enqueuing: {e}" ) ;
@@ -154,6 +142,7 @@ mod test {
154142 use std:: future:: ready;
155143
156144 use chrono:: Utc ;
145+ use tokio:: sync:: oneshot;
157146
158147 use crate :: storage:: Error ;
159148 use libsql_sys:: name:: NamespaceName ;
@@ -162,29 +151,34 @@ mod test {
162151
163152 #[ tokio:: test]
164153 async fn schedule_simple ( ) {
165- let ( sender, mut receiver) = tokio:: sync:: mpsc:: channel ( 10 ) ;
166- let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( sender) ;
154+ let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( ) ;
167155
168156 let ns1 = NamespaceName :: from ( "test1" ) ;
169157 let ns2 = NamespaceName :: from ( "test2" ) ;
170158
159+ let ( job_1_snd, job_1_rcv) = oneshot:: channel ( ) ;
171160 scheduler. register (
172161 StoreSegmentRequest {
173162 namespace : ns1. clone ( ) ,
174163 segment : ( ) ,
175164 created_at : Utc :: now ( ) ,
176165 storage_config_override : None ,
177- on_store_callback : Box :: new ( |_| Box :: pin ( ready ( ( ) ) ) ) ,
166+ on_store_callback : Box :: new ( move |n| Box :: pin ( async move {
167+ let _ = job_1_snd. send ( n) ;
168+ } ) ) ,
178169 } ,
179170 ) ;
180171
172+ let ( job_2_snd, job_2_rcv) = oneshot:: channel ( ) ;
181173 scheduler. register (
182174 StoreSegmentRequest {
183175 namespace : ns2. clone ( ) ,
184176 segment : ( ) ,
185177 created_at : Utc :: now ( ) ,
186178 storage_config_override : None ,
187- on_store_callback : Box :: new ( |_| Box :: pin ( ready ( ( ) ) ) ) ,
179+ on_store_callback : Box :: new ( move |n| Box :: pin ( async move {
180+ let _ = job_2_snd. send ( n) ;
181+ } ) ) ,
188182 } ,
189183 ) ;
190184
@@ -194,7 +188,7 @@ mod test {
194188 segment : ( ) ,
195189 created_at : Utc :: now ( ) ,
196190 storage_config_override : None ,
197- on_store_callback : Box :: new ( |_| Box :: pin ( ready ( ( ) ) ) ) ,
191+ on_store_callback : Box :: new ( move |_| Box :: pin ( ready ( ( ) ) ) ) ,
198192 } ,
199193 ) ;
200194
@@ -215,15 +209,15 @@ mod test {
215209 . await ;
216210
217211 assert ! ( scheduler. schedule( ) . is_none( ) ) ;
218- assert_eq ! ( receiver . recv ( ) . await . unwrap( ) , ( ns2 . clone ( ) , 42 ) ) ;
212+ assert_eq ! ( job_2_rcv . await . unwrap( ) , 42 ) ;
219213
220214 scheduler
221215 . report ( JobResult {
222216 job : job1,
223217 result : Ok ( 10 ) ,
224218 } )
225219 . await ;
226- assert_eq ! ( receiver . recv ( ) . await . unwrap( ) , ( ns1 . clone ( ) , 10 ) ) ;
220+ assert_eq ! ( job_1_rcv . await . unwrap( ) , 10 ) ;
227221
228222 let job1 = scheduler. schedule ( ) . unwrap ( ) ;
229223 assert_eq ! ( job1. request. request. namespace, ns1) ;
@@ -232,8 +226,7 @@ mod test {
232226
233227 #[ tokio:: test]
234228 async fn job_error_reschedule ( ) {
235- let ( sender, _) = tokio:: sync:: mpsc:: channel ( 10 ) ;
236- let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( sender) ;
229+ let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( ) ;
237230
238231 let ns1 = NamespaceName :: from ( "test1" ) ;
239232 let ns2 = NamespaceName :: from ( "test2" ) ;
@@ -275,8 +268,7 @@ mod test {
275268
276269 #[ tokio:: test]
277270 async fn schedule_while_in_flight ( ) {
278- let ( sender, _) = tokio:: sync:: mpsc:: channel ( 10 ) ;
279- let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( sender) ;
271+ let mut scheduler = Scheduler :: < ( ) , ( ) > :: new ( ) ;
280272
281273 let ns1 = NamespaceName :: from ( "test1" ) ;
282274
0 commit comments