@@ -13,12 +13,13 @@ use crate::error::Error;
1313use crate :: http:: user:: timing:: sample_time;
1414use crate :: metrics:: {
1515 CONCURRENT_CONNECTIONS_COUNT , CONNECTION_ALIVE_DURATION , CONNECTION_CREATE_TIME ,
16+ TOTAL_RESPONSE_SIZE_HIST ,
1617} ;
1718use crate :: namespace:: meta_store:: MetaStore ;
1819use crate :: namespace:: NamespaceName ;
1920use crate :: query:: { Params , Query } ;
2021use crate :: query_analysis:: Statement ;
21- use crate :: query_result_builder:: { IgnoreResult , QueryResultBuilder } ;
22+ use crate :: query_result_builder:: { IgnoreResult , QueryResultBuilder , TOTAL_RESPONSE_SIZE } ;
2223use crate :: replication:: FrameNo ;
2324use crate :: Result ;
2425
@@ -205,6 +206,7 @@ pub trait MakeConnection: Send + Sync + 'static {
205206 timeout : Option < Duration > ,
206207 max_total_response_size : u64 ,
207208 max_concurrent_requests : u64 ,
209+ disable_intelligent_throttling : bool ,
208210 ) -> MakeThrottledConnection < Self >
209211 where
210212 Self : Sized ,
@@ -215,6 +217,7 @@ pub trait MakeConnection: Send + Sync + 'static {
215217 timeout,
216218 max_total_response_size,
217219 max_concurrent_requests,
220+ disable_intelligent_throttling,
218221 )
219222 }
220223
@@ -280,6 +283,7 @@ pub struct MakeThrottledConnection<F> {
280283 max_total_response_size : u64 ,
281284 waiters : AtomicUsize ,
282285 max_concurrent_requests : u64 ,
286+ disable_intelligent_throttling : bool ,
283287}
284288
285289impl < F > MakeThrottledConnection < F > {
@@ -289,6 +293,7 @@ impl<F> MakeThrottledConnection<F> {
289293 timeout : Option < Duration > ,
290294 max_total_response_size : u64 ,
291295 max_concurrent_requests : u64 ,
296+ disable_intelligent_throttling : bool ,
292297 ) -> Self {
293298 Self {
294299 semaphore,
@@ -297,12 +302,16 @@ impl<F> MakeThrottledConnection<F> {
297302 max_total_response_size,
298303 waiters : AtomicUsize :: new ( 0 ) ,
299304 max_concurrent_requests,
305+ disable_intelligent_throttling,
300306 }
301307 }
302308
303309 // How many units should be acquired from the semaphore,
304310 // depending on current memory pressure.
305311 fn units_to_take ( & self ) -> u32 {
312+ if self . disable_intelligent_throttling {
313+ return 1 ;
314+ }
306315 let total_response_size = crate :: query_result_builder:: TOTAL_RESPONSE_SIZE
307316 . load ( std:: sync:: atomic:: Ordering :: Relaxed ) as u64 ;
308317 if total_response_size * 2 > self . max_total_response_size {
@@ -352,6 +361,8 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
352361 "Available semaphore units: {}" ,
353362 self . semaphore. available_permits( )
354363 ) ;
364+ TOTAL_RESPONSE_SIZE_HIST
365+ . record ( TOTAL_RESPONSE_SIZE . load ( std:: sync:: atomic:: Ordering :: Relaxed ) as f64 ) ;
355366 let units = self . units_to_take ( ) ;
356367 let waiters_guard = WaitersGuard :: new ( & self . waiters ) ;
357368 if ( waiters_guard. waiters . load ( Ordering :: Relaxed ) as u64 ) >= self . max_concurrent_requests {
@@ -519,6 +530,7 @@ pub mod test {
519530 Some ( Duration :: from_millis ( 100 ) ) ,
520531 u64:: MAX ,
521532 u64:: MAX ,
533+ false ,
522534 ) ;
523535
524536 let mut conns = Vec :: with_capacity ( 10 ) ;
0 commit comments