Skip to content

Commit 4069036

Browse files
committed
cancel query when request is dropped
1 parent 9d15ebf commit 4069036

1 file changed

Lines changed: 47 additions & 2 deletions

File tree

libsql-server/src/connection/libsql.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,14 +391,43 @@ where
391391
ctx: RequestContext,
392392
builder: B,
393393
) -> Result<(B, Program)> {
394+
struct Bomb {
395+
canceled: Arc<AtomicBool>,
396+
defused: bool,
397+
}
398+
399+
impl Drop for Bomb {
400+
fn drop(&mut self) {
401+
if !self.defused {
402+
tracing::debug!("cancelling request");
403+
self.canceled.store(true, Ordering::Relaxed);
404+
}
405+
}
406+
}
407+
408+
let canceled = {
409+
let cancelled = self.inner.lock().canceled.clone();
410+
cancelled.store(false, Ordering::Relaxed);
411+
cancelled
412+
};
413+
414+
let mut bomb = Bomb {
415+
canceled,
416+
defused: false,
417+
};
418+
394419
PROGRAM_EXEC_COUNT.increment(1);
395420

396421
check_program_auth(&ctx, &pgm, &self.inner.lock().config_store.get())?;
397422
let conn = self.inner.clone();
398-
BLOCKING_RT
423+
let ret = BLOCKING_RT
399424
.spawn_blocking(move || Connection::run(conn, pgm, builder))
400425
.await
401-
.unwrap()
426+
.unwrap();
427+
428+
bomb.defused = true;
429+
430+
ret
402431
}
403432
}
404433

@@ -413,6 +442,7 @@ pub(super) struct Connection<W> {
413442
forced_rollback: bool,
414443
broadcaster: BroadcasterHandle,
415444
hooked: bool,
445+
canceled: Arc<AtomicBool>,
416446
}
417447

418448
fn update_stats(
@@ -475,6 +505,19 @@ impl<W: Wal> Connection<W> {
475505
);
476506
}
477507

508+
let canceled = Arc::new(AtomicBool::new(false));
509+
510+
conn.progress_handler(100, {
511+
let canceled = canceled.clone();
512+
Some(move || {
513+
let canceled = canceled.load(Ordering::Relaxed);
514+
if canceled {
515+
tracing::debug!("request canceled");
516+
}
517+
canceled
518+
})
519+
});
520+
478521
let this = Self {
479522
conn,
480523
stats,
@@ -486,6 +529,7 @@ impl<W: Wal> Connection<W> {
486529
forced_rollback: false,
487530
broadcaster,
488531
hooked: false,
532+
canceled,
489533
};
490534

491535
for ext in extensions.iter() {
@@ -795,6 +839,7 @@ mod test {
795839
forced_rollback: false,
796840
broadcaster: Default::default(),
797841
hooked: false,
842+
canceled: Arc::new(false.into()),
798843
};
799844

800845
let conn = Arc::new(Mutex::new(conn));

0 commit comments

Comments
 (0)