Skip to content

Commit fd03144

Browse files
committed
decoupled namespace configurators
1 parent 907f2f9 commit fd03144

11 files changed

Lines changed: 1056 additions & 806 deletions

File tree

libsql-server/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tonic::metadata::errors::InvalidMetadataValueBytes;
44

55
use crate::{
66
auth::AuthError,
7-
namespace::{ForkError, NamespaceName},
7+
namespace::{configurator::fork::ForkError, NamespaceName},
88
query_result_builder::QueryResultBuilderError,
99
};
1010

libsql-server/src/lib.rs

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use libsql_wal::registry::WalRegistry;
4646
use libsql_wal::storage::NoStorage;
4747
use libsql_wal::wal::LibsqlWalManager;
4848
use namespace::meta_store::MetaStoreHandle;
49-
use namespace::{NamespaceConfig, NamespaceName};
49+
use namespace::NamespaceName;
5050
use net::Connector;
5151
use once_cell::sync::Lazy;
5252
use rusqlite::ffi::SQLITE_CONFIG_MALLOC;
@@ -60,7 +60,7 @@ use utils::services::idle_shutdown::IdleShutdownKicker;
6060

6161
use self::config::MetaStoreConfig;
6262
use self::connection::connection_manager::InnerWalManager;
63-
use self::namespace::configurator::NamespaceConfigurators;
63+
use self::namespace::configurator::{BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfigurator, PrimaryExtraConfig, ReplicaConfigurator, SchemaConfigurator};
6464
use self::namespace::NamespaceStore;
6565
use self::net::AddrIncoming;
6666
use self::replication::script_backup_manager::{CommandHandler, ScriptBackupManager};
@@ -425,11 +425,6 @@ where
425425
let user_auth_strategy = self.user_api_config.auth_strategy.clone();
426426

427427
let service_shutdown = Arc::new(Notify::new());
428-
let db_kind = if self.rpc_client_config.is_some() {
429-
DatabaseKind::Replica
430-
} else {
431-
DatabaseKind::Primary
432-
};
433428

434429
let scripted_backup = match self.db_config.snapshot_exec {
435430
Some(ref command) => {
@@ -457,27 +452,6 @@ where
457452
// chose the wal backend
458453
let (make_wal_manager, registry_shutdown) = self.configure_wal_manager(&mut join_set)?;
459454

460-
let ns_config = NamespaceConfig {
461-
db_kind,
462-
base_path: self.path.clone(),
463-
max_log_size: self.db_config.max_log_size,
464-
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
465-
bottomless_replication: self.db_config.bottomless_replication.clone(),
466-
extensions,
467-
stats_sender: stats_sender.clone(),
468-
max_response_size: self.db_config.max_response_size,
469-
max_total_response_size: self.db_config.max_total_response_size,
470-
checkpoint_interval: self.db_config.checkpoint_interval,
471-
encryption_config: self.db_config.encryption_config.clone(),
472-
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
473-
scripted_backup,
474-
max_concurrent_requests: self.db_config.max_concurrent_requests,
475-
channel: channel.clone(),
476-
uri: uri.clone(),
477-
migration_scheduler: scheduler_sender.into(),
478-
make_wal_manager,
479-
};
480-
481455
let (metastore_conn_maker, meta_store_wal_manager) =
482456
metastore_connection_maker(self.meta_store_config.bottomless.clone(), &self.path)
483457
.await?;
@@ -490,15 +464,67 @@ where
490464
)
491465
.await?;
492466

493-
let configurators = NamespaceConfigurators::default();
467+
let base_config = BaseNamespaceConfig {
468+
base_path: self.path.clone(),
469+
extensions,
470+
stats_sender,
471+
max_response_size: self.db_config.max_response_size,
472+
max_total_response_size: self.db_config.max_total_response_size,
473+
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
474+
max_concurrent_requests: self.db_config.max_concurrent_requests,
475+
};
476+
477+
let mut configurators = NamespaceConfigurators::default();
478+
479+
let db_kind = match channel.clone().zip(uri.clone()) {
480+
// replica mode
481+
Some((channel, uri)) => {
482+
let replica_configurator = ReplicaConfigurator::new(
483+
base_config,
484+
channel,
485+
uri,
486+
make_wal_manager,
487+
);
488+
configurators.with_replica(replica_configurator);
489+
DatabaseKind::Replica
490+
}
491+
// primary mode
492+
None => {
493+
let primary_config = PrimaryExtraConfig {
494+
max_log_size: self.db_config.max_log_size,
495+
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
496+
bottomless_replication: self.db_config.bottomless_replication.clone(),
497+
scripted_backup,
498+
checkpoint_interval: self.db_config.checkpoint_interval,
499+
};
500+
501+
let primary_configurator = PrimaryConfigurator::new(
502+
base_config.clone(),
503+
primary_config.clone(),
504+
make_wal_manager.clone(),
505+
);
506+
507+
let schema_configurator = SchemaConfigurator::new(
508+
base_config.clone(),
509+
primary_config,
510+
make_wal_manager.clone(),
511+
scheduler_sender.into(),
512+
);
513+
514+
configurators.with_schema(schema_configurator);
515+
configurators.with_primary(primary_configurator);
516+
517+
DatabaseKind::Primary
518+
},
519+
};
494520

495521
let namespace_store: NamespaceStore = NamespaceStore::new(
496522
db_kind.is_replica(),
497523
self.db_config.snapshot_at_shutdown,
498524
self.max_active_namespaces,
499-
ns_config,
500525
meta_store,
501526
configurators,
527+
db_kind,
502528
)
503529
.await?;
504530

libsql-server/src/namespace/fork.rs renamed to libsql-server/src/namespace/configurator/fork.rs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,71 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt};
1212
use tokio::time::Duration;
1313
use tokio_stream::StreamExt;
1414

15+
use crate::database::Database;
16+
use crate::namespace::meta_store::MetaStoreHandle;
17+
use crate::namespace::{Namespace, NamespaceBottomlessDbId};
1518
use crate::replication::primary::frame_stream::FrameStream;
1619
use crate::replication::{LogReadError, ReplicationLogger};
1720
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
1821

19-
use super::meta_store::MetaStoreHandle;
20-
use super::{NamespaceName, NamespaceStore, RestoreOption};
22+
use super::helpers::make_bottomless_options;
23+
use super::{NamespaceName, NamespaceStore, PrimaryExtraConfig, RestoreOption};
2124

2225
type Result<T> = crate::Result<T, ForkError>;
2326

27+
pub(super) async fn fork(
28+
from_ns: &Namespace,
29+
from_config: MetaStoreHandle,
30+
to_ns: NamespaceName,
31+
to_config: MetaStoreHandle,
32+
timestamp: Option<NaiveDateTime>,
33+
store: NamespaceStore,
34+
primary_config: &PrimaryExtraConfig,
35+
base_path: Arc<Path>,
36+
) -> crate::Result<Namespace> {
37+
let from_config = from_config.get();
38+
let bottomless_db_id = NamespaceBottomlessDbId::from_config(&from_config);
39+
let restore_to = if let Some(timestamp) = timestamp {
40+
if let Some(ref options) = primary_config.bottomless_replication {
41+
Some(PointInTimeRestore {
42+
timestamp,
43+
replicator_options: make_bottomless_options(
44+
options,
45+
bottomless_db_id.clone(),
46+
from_ns.name().clone(),
47+
),
48+
})
49+
} else {
50+
return Err(crate::Error::Fork(ForkError::BackupServiceNotConfigured));
51+
}
52+
} else {
53+
None
54+
};
55+
56+
let logger = match &from_ns.db {
57+
Database::Primary(db) => db.wal_wrapper.wrapper().logger(),
58+
Database::Schema(db) => db.wal_wrapper.wrapper().logger(),
59+
_ => {
60+
return Err(crate::Error::Fork(ForkError::Internal(anyhow::Error::msg(
61+
"Invalid source database type for fork",
62+
))));
63+
}
64+
};
65+
66+
let fork_task = ForkTask {
67+
base_path,
68+
to_namespace: to_ns.clone(),
69+
logger,
70+
restore_to,
71+
to_config,
72+
store,
73+
};
74+
75+
let ns = fork_task.fork().await?;
76+
77+
Ok(ns)
78+
}
79+
2480
#[derive(Debug, thiserror::Error)]
2581
pub enum ForkError {
2682
#[error("internal error: {0}")]
@@ -58,7 +114,7 @@ pub struct ForkTask {
58114
pub to_namespace: NamespaceName,
59115
pub to_config: MetaStoreHandle,
60116
pub restore_to: Option<PointInTimeRestore>,
61-
pub store: NamespaceStore,
117+
pub store: NamespaceStore
62118
}
63119

64120
pub struct PointInTimeRestore {

0 commit comments

Comments
 (0)