Skip to content

Commit 48af2ee

Browse files
committed
pass connection maker to migration scheduler
1 parent 5422921 commit 48af2ee

3 files changed

Lines changed: 53 additions & 60 deletions

File tree

libsql-server/src/query_result_builder.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,6 @@ impl<B: QueryResultBuilder> QueryResultBuilder for Take<B> {
525525
pub mod test {
526526
use std::fmt;
527527

528-
use crate::connection::program::Program;
529528
use arbitrary::{Arbitrary, Unstructured};
530529
use itertools::Itertools;
531530
use rand::{
@@ -1035,16 +1034,13 @@ pub mod test {
10351034
fn add_stats(&mut self, _rows_read: u64, _rows_written: u64, _duration: Duration) {}
10361035
}
10371036

1038-
pub fn test_driver(
1039-
iter: usize,
1040-
f: impl Fn(FsmQueryBuilder) -> crate::Result<(FsmQueryBuilder, Program)>,
1041-
) {
1037+
pub fn test_driver(iter: usize, f: impl Fn(FsmQueryBuilder) -> crate::Result<FsmQueryBuilder>) {
10421038
for _ in 0..iter {
10431039
// inject random errors
10441040
let builder = FsmQueryBuilder::new(true);
10451041
match f(builder) {
10461042
Ok(b) => {
1047-
assert_eq!(b.0.state, Finish);
1043+
assert_eq!(b.state, Finish);
10481044
}
10491045
Err(e) => {
10501046
assert!(matches!(e, crate::Error::BuilderError(_)));

libsql-server/src/rpc/proxy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ pub mod rpc {
222222
impl From<connection::program::Program> for Program {
223223
fn from(pgm: connection::program::Program) -> Self {
224224
Self {
225-
steps: pgm.steps.into_iter().map(|s| s.into()).collect(),
225+
steps: pgm.steps.iter().map(|s| s.clone().into()).collect(),
226226
}
227227
}
228228
}

libsql-server/src/schema/scheduler.rs

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use tokio::task::JoinSet;
1010

1111
use crate::connection::program::Program;
1212
use crate::connection::{Connection, MakeConnection};
13-
use crate::database::PrimaryConnectionMaker;
1413
use crate::namespace::meta_store::{MetaStore, MetaStoreConnection};
1514
use crate::namespace::{NamespaceName, NamespaceStore};
1615
use crate::query_result_builder::{IgnoreResult, QueryBuilderConfig};
@@ -346,16 +345,20 @@ impl Scheduler {
346345

347346
// enqueue some work
348347
if let Some(task) = self.current_batch.pop() {
349-
let (connection_maker, block_writes) =
350-
self.namespace_store
351-
.with(task.namespace(), move |ns| {
352-
let db = ns.db.as_primary().expect(
353-
"attempting to perform schema migration on non-primary database",
354-
);
355-
(db.connection_maker().clone(), db.block_writes.clone())
356-
})
357-
.await
358-
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
348+
let (connection_maker, block_writes) = self
349+
.namespace_store
350+
.with(task.namespace(), move |ns| {
351+
assert!(
352+
ns.db.is_primary(),
353+
"attempting to perform schema migration on non-primary database"
354+
);
355+
(
356+
ns.db.connection_maker().clone(),
357+
ns.db.block_writes().unwrap(),
358+
)
359+
})
360+
.await
361+
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
359362

360363
// we block the writes before enqueuing the task, it makes testing predictable
361364
if *task.status() == MigrationTaskStatus::Enqueued {
@@ -426,7 +429,7 @@ async fn try_step_task(
426429
_permit: OwnedSemaphorePermit,
427430
namespace_store: NamespaceStore,
428431
migration_db: Arc<Mutex<MetaStoreConnection>>,
429-
connection_maker: Arc<PrimaryConnectionMaker>,
432+
connection_maker: Arc<dyn MakeConnection<Connection = crate::database::Connection>>,
430433
job_status: MigrationJobStatus,
431434
migration: Arc<Program>,
432435
mut task: MigrationTask,
@@ -477,7 +480,7 @@ async fn try_step_task(
477480

478481
async fn try_step_task_inner(
479482
namespace_store: NamespaceStore,
480-
connection_maker: Arc<PrimaryConnectionMaker>,
483+
connection_maker: Arc<dyn MakeConnection<Connection = crate::database::Connection>>,
481484
job_status: MigrationJobStatus,
482485
migration: Arc<Program>,
483486
task: &MigrationTask,
@@ -739,11 +742,11 @@ async fn step_job_run_success(
739742
// TODO: check that all tasks actually reported success before migration
740743
let connection_maker = namespace_store
741744
.with(schema.clone(), |ns| {
742-
ns.db
743-
.as_schema()
744-
.expect("expected database to be a schema database")
745-
.connection_maker()
746-
.clone()
745+
assert!(
746+
ns.db.is_schema(),
747+
"expected database to be a schema database"
748+
);
749+
ns.db.connection_maker()
747750
})
748751
.await
749752
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
@@ -753,30 +756,28 @@ async fn step_job_run_success(
753756
.await
754757
.map_err(|e| Error::FailedToConnect(schema.clone(), e.into()))?;
755758
tokio::task::spawn_blocking(move || -> Result<(), Error> {
756-
connection
757-
.connection()
758-
.with_raw(|conn| -> Result<(), Error> {
759-
let mut txn = conn.transaction()?;
760-
let schema_version =
761-
txn.query_row("PRAGMA schema_version", (), |row| row.get::<_, i64>(0))?;
762-
763-
if schema_version != job_id {
764-
// todo: use proper builder and collect errors
765-
let (ret, _status) = perform_migration(
766-
&mut txn,
767-
&migration,
768-
false,
769-
IgnoreResult,
770-
&QueryBuilderConfig::default(),
771-
);
772-
let _error = ret.err().map(|e| e.to_string());
773-
txn.pragma_update(None, "schema_version", job_id)?;
774-
// update schema version to job_id?
775-
txn.commit()?;
776-
}
759+
connection.with_raw(|conn| -> Result<(), Error> {
760+
let mut txn = conn.transaction()?;
761+
let schema_version =
762+
txn.query_row("PRAGMA schema_version", (), |row| row.get::<_, i64>(0))?;
763+
764+
if schema_version != job_id {
765+
// todo: use proper builder and collect errors
766+
let (ret, _status) = perform_migration(
767+
&mut txn,
768+
&migration,
769+
false,
770+
IgnoreResult,
771+
&QueryBuilderConfig::default(),
772+
);
773+
let _error = ret.err().map(|e| e.to_string());
774+
txn.pragma_update(None, "schema_version", job_id)?;
775+
// update schema version to job_id?
776+
txn.commit()?;
777+
}
777778

778-
Ok(())
779-
})
779+
Ok(())
780+
})
780781
})
781782
.await
782783
.expect("task panicked")?;
@@ -809,7 +810,7 @@ mod test {
809810
use crate::connection::config::DatabaseConfig;
810811
use crate::database::DatabaseKind;
811812
use crate::namespace::configurator::{
812-
BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfigurator, PrimaryExtraConfig,
813+
BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfig, PrimaryConfigurator,
813814
SchemaConfigurator,
814815
};
815816
use crate::namespace::meta_store::{metastore_connection_maker, MetaStore};
@@ -863,10 +864,8 @@ mod test {
863864

864865
let (block_write, ns_conn_maker) = store
865866
.with("ns".into(), |ns| {
866-
(
867-
ns.db.as_primary().unwrap().block_writes.clone(),
868-
ns.db.as_primary().unwrap().connection_maker(),
869-
)
867+
assert!(ns.db.is_primary());
868+
(ns.db.block_writes().unwrap(), ns.db.connection_maker())
870869
})
871870
.await
872871
.unwrap();
@@ -920,7 +919,7 @@ mod test {
920919
encryption_config: None,
921920
};
922921

923-
let primary_config = PrimaryExtraConfig {
922+
let primary_config = PrimaryConfig {
924923
max_log_size: 1000000000,
925924
max_log_duration: None,
926925
bottomless_replication: None,
@@ -989,10 +988,8 @@ mod test {
989988

990989
let (block_write, ns_conn_maker) = store
991990
.with("ns".into(), |ns| {
992-
(
993-
ns.db.as_primary().unwrap().block_writes.clone(),
994-
ns.db.as_primary().unwrap().connection_maker(),
995-
)
991+
assert!(ns.db.is_primary());
992+
(ns.db.block_writes().unwrap(), ns.db.connection_maker())
996993
})
997994
.await
998995
.unwrap();
@@ -1040,11 +1037,11 @@ mod test {
10401037

10411038
store
10421039
.with("ns".into(), |ns| {
1040+
assert!(ns.db.is_primary());
10431041
assert!(ns
10441042
.db
1045-
.as_primary()
1043+
.block_writes()
10461044
.unwrap()
1047-
.block_writes
10481045
.load(std::sync::atomic::Ordering::Relaxed));
10491046
})
10501047
.await

0 commit comments

Comments
 (0)