Skip to content

Commit 0917f84

Browse files
committed
introduce NamespaceConfigurator
1 parent f0d6611 commit 0917f84

4 files changed

Lines changed: 383 additions & 8 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::pin::Pin;
2+
3+
use futures::Future;
4+
5+
use super::broadcasters::BroadcasterHandle;
6+
use super::meta_store::MetaStoreHandle;
7+
use super::{NamespaceConfig, NamespaceName, NamespaceStore, ResetCb, ResolveNamespacePathFn, RestoreOption};
8+
9+
mod replica;
10+
mod primary;
11+
12+
type DynConfigurator = Box<dyn ConfigureNamespace + Send + Sync + 'static>;
13+
14+
#[derive(Default)]
15+
struct NamespaceConfigurators {
16+
replica_configurator: Option<DynConfigurator>,
17+
primary_configurator: Option<DynConfigurator>,
18+
schema_configurator: Option<DynConfigurator>,
19+
}
20+
21+
impl NamespaceConfigurators {
22+
pub fn with_primary(
23+
&mut self,
24+
c: impl ConfigureNamespace + Send + Sync + 'static,
25+
) -> &mut Self {
26+
self.primary_configurator = Some(Box::new(c));
27+
self
28+
}
29+
30+
pub fn with_replica(
31+
&mut self,
32+
c: impl ConfigureNamespace + Send + Sync + 'static,
33+
) -> &mut Self {
34+
self.replica_configurator = Some(Box::new(c));
35+
self
36+
}
37+
38+
pub fn with_schema(&mut self, c: impl ConfigureNamespace + Send + Sync + 'static) -> &mut Self {
39+
self.schema_configurator = Some(Box::new(c));
40+
self
41+
}
42+
}
43+
44+
pub trait ConfigureNamespace {
45+
fn setup<'a>(
46+
&'a self,
47+
ns_config: &'a NamespaceConfig,
48+
db_config: MetaStoreHandle,
49+
restore_option: RestoreOption,
50+
name: &'a NamespaceName,
51+
reset: ResetCb,
52+
resolve_attach_path: ResolveNamespacePathFn,
53+
store: NamespaceStore,
54+
broadcaster: BroadcasterHandle,
55+
) -> Pin<Box<dyn Future<Output = crate::Result<super::Namespace>> + Send + 'a>>;
56+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
2+
use std::{path::Path, pin::Pin, sync::Arc};
3+
4+
use futures::prelude::Future;
5+
use tokio::task::JoinSet;
6+
7+
use crate::connection::MakeConnection;
8+
use crate::database::{Database, PrimaryDatabase};
9+
use crate::namespace::{Namespace, NamespaceConfig, NamespaceName, NamespaceStore, ResetCb, ResolveNamespacePathFn, RestoreOption};
10+
use crate::namespace::meta_store::MetaStoreHandle;
11+
use crate::namespace::broadcasters::BroadcasterHandle;
12+
use crate::run_periodic_checkpoint;
13+
use crate::schema::{has_pending_migration_task, setup_migration_table};
14+
15+
use super::ConfigureNamespace;
16+
17+
pub struct PrimaryConfigurator;
18+
19+
impl ConfigureNamespace for PrimaryConfigurator {
20+
fn setup<'a>(
21+
&'a self,
22+
config: &'a NamespaceConfig,
23+
meta_store_handle: MetaStoreHandle,
24+
restore_option: RestoreOption,
25+
name: &'a NamespaceName,
26+
_reset: ResetCb,
27+
resolve_attach_path: ResolveNamespacePathFn,
28+
_store: NamespaceStore,
29+
broadcaster: BroadcasterHandle,
30+
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>>
31+
{
32+
Box::pin(async move {
33+
let db_path: Arc<Path> = config.base_path.join("dbs").join(name.as_str()).into();
34+
let fresh_namespace = !db_path.try_exists()?;
35+
// FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications
36+
match try_new_primary(
37+
config,
38+
name.clone(),
39+
meta_store_handle,
40+
restore_option,
41+
resolve_attach_path,
42+
db_path.clone(),
43+
broadcaster,
44+
)
45+
.await
46+
{
47+
Ok(this) => Ok(this),
48+
Err(e) if fresh_namespace => {
49+
tracing::error!("an error occured while deleting creating namespace, cleaning...");
50+
if let Err(e) = tokio::fs::remove_dir_all(&db_path).await {
51+
tracing::error!("failed to remove dirty namespace directory: {e}")
52+
}
53+
Err(e)
54+
}
55+
Err(e) => Err(e),
56+
}
57+
})
58+
}
59+
}
60+
61+
#[tracing::instrument(skip_all, fields(namespace))]
62+
async fn try_new_primary(
63+
ns_config: &NamespaceConfig,
64+
namespace: NamespaceName,
65+
meta_store_handle: MetaStoreHandle,
66+
restore_option: RestoreOption,
67+
resolve_attach_path: ResolveNamespacePathFn,
68+
db_path: Arc<Path>,
69+
broadcaster: BroadcasterHandle,
70+
) -> crate::Result<Namespace> {
71+
let mut join_set = JoinSet::new();
72+
73+
tokio::fs::create_dir_all(&db_path).await?;
74+
75+
let block_writes = Arc::new(AtomicBool::new(false));
76+
let (connection_maker, wal_wrapper, stats) = Namespace::make_primary_connection_maker(
77+
ns_config,
78+
&meta_store_handle,
79+
&db_path,
80+
&namespace,
81+
restore_option,
82+
block_writes.clone(),
83+
&mut join_set,
84+
resolve_attach_path,
85+
broadcaster,
86+
)
87+
.await?;
88+
let connection_maker = Arc::new(connection_maker);
89+
90+
if meta_store_handle.get().shared_schema_name.is_some() {
91+
let block_writes = block_writes.clone();
92+
let conn = connection_maker.create().await?;
93+
tokio::task::spawn_blocking(move || {
94+
conn.with_raw(|conn| -> crate::Result<()> {
95+
setup_migration_table(conn)?;
96+
if has_pending_migration_task(conn)? {
97+
block_writes.store(true, Ordering::SeqCst);
98+
}
99+
Ok(())
100+
})
101+
})
102+
.await
103+
.unwrap()?;
104+
}
105+
106+
if let Some(checkpoint_interval) = ns_config.checkpoint_interval {
107+
join_set.spawn(run_periodic_checkpoint(
108+
connection_maker.clone(),
109+
checkpoint_interval,
110+
namespace.clone(),
111+
));
112+
}
113+
114+
tracing::debug!("Done making new primary");
115+
116+
Ok(Namespace {
117+
tasks: join_set,
118+
db: Database::Primary(PrimaryDatabase {
119+
wal_wrapper,
120+
connection_maker,
121+
block_writes,
122+
}),
123+
name: namespace,
124+
stats,
125+
db_config_store: meta_store_handle,
126+
path: db_path.into(),
127+
})
128+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
4+
use futures::Future;
5+
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
6+
use tokio::task::JoinSet;
7+
8+
use crate::connection::write_proxy::MakeWriteProxyConn;
9+
use crate::connection::MakeConnection;
10+
use crate::database::{Database, ReplicaDatabase};
11+
use crate::namespace::broadcasters::BroadcasterHandle;
12+
use crate::namespace::meta_store::MetaStoreHandle;
13+
use crate::namespace::{Namespace, RestoreOption};
14+
use crate::namespace::{
15+
make_stats, NamespaceConfig, NamespaceName, NamespaceStore, ResetCb, ResetOp,
16+
ResolveNamespacePathFn,
17+
};
18+
use crate::{DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};
19+
20+
use super::ConfigureNamespace;
21+
22+
pub struct ReplicaConfigurator;
23+
24+
impl ConfigureNamespace for ReplicaConfigurator {
25+
fn setup<'a>(
26+
&'a self,
27+
config: &'a NamespaceConfig,
28+
meta_store_handle: MetaStoreHandle,
29+
restore_option: RestoreOption,
30+
name: &'a NamespaceName,
31+
reset: ResetCb,
32+
resolve_attach_path: ResolveNamespacePathFn,
33+
store: NamespaceStore,
34+
broadcaster: BroadcasterHandle,
35+
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>>
36+
{
37+
Box::pin(async move {
38+
tracing::debug!("creating replica namespace");
39+
let db_path = config.base_path.join("dbs").join(name.as_str());
40+
let channel = config.channel.clone().expect("bad replica config");
41+
let uri = config.uri.clone().expect("bad replica config");
42+
43+
let rpc_client = ReplicationLogClient::with_origin(channel.clone(), uri.clone());
44+
let client = crate::replication::replicator_client::Client::new(
45+
name.clone(),
46+
rpc_client,
47+
&db_path,
48+
meta_store_handle.clone(),
49+
store.clone(),
50+
)
51+
.await?;
52+
let applied_frame_no_receiver = client.current_frame_no_notifier.subscribe();
53+
let mut replicator = libsql_replication::replicator::Replicator::new(
54+
client,
55+
db_path.join("data"),
56+
DEFAULT_AUTO_CHECKPOINT,
57+
config.encryption_config.clone(),
58+
)
59+
.await?;
60+
61+
tracing::debug!("try perform handshake");
62+
// force a handshake now, to retrieve the primary's current replication index
63+
match replicator.try_perform_handshake().await {
64+
Err(libsql_replication::replicator::Error::Meta(
65+
libsql_replication::meta::Error::LogIncompatible,
66+
)) => {
67+
tracing::error!(
68+
"trying to replicate incompatible logs, reseting replica and nuking db dir"
69+
);
70+
std::fs::remove_dir_all(&db_path).unwrap();
71+
return self.setup(
72+
config,
73+
meta_store_handle,
74+
restore_option,
75+
name,
76+
reset,
77+
resolve_attach_path,
78+
store,
79+
broadcaster,
80+
)
81+
.await;
82+
}
83+
Err(e) => Err(e)?,
84+
Ok(_) => (),
85+
}
86+
87+
tracing::debug!("done performing handshake");
88+
89+
let primary_current_replicatio_index = replicator.client_mut().primary_replication_index;
90+
91+
let mut join_set = JoinSet::new();
92+
let namespace = name.clone();
93+
join_set.spawn(async move {
94+
use libsql_replication::replicator::Error;
95+
loop {
96+
match replicator.run().await {
97+
err @ Error::Fatal(_) => Err(err)?,
98+
err @ Error::NamespaceDoesntExist => {
99+
tracing::error!("namespace {namespace} doesn't exist, destroying...");
100+
(reset)(ResetOp::Destroy(namespace.clone()));
101+
Err(err)?;
102+
}
103+
e @ Error::Injector(_) => {
104+
tracing::error!("potential corruption detected while replicating, reseting replica: {e}");
105+
(reset)(ResetOp::Reset(namespace.clone()));
106+
Err(e)?;
107+
},
108+
Error::Meta(err) => {
109+
use libsql_replication::meta::Error;
110+
match err {
111+
Error::LogIncompatible => {
112+
tracing::error!("trying to replicate incompatible logs, reseting replica");
113+
(reset)(ResetOp::Reset(namespace.clone()));
114+
Err(err)?;
115+
}
116+
Error::InvalidMetaFile
117+
| Error::Io(_)
118+
| Error::InvalidLogId
119+
| Error::FailedToCommit(_)
120+
| Error::InvalidReplicationPath
121+
| Error::RequiresCleanDatabase => {
122+
// We retry from last frame index?
123+
tracing::warn!("non-fatal replication error, retrying from last commit index: {err}");
124+
},
125+
}
126+
}
127+
e @ (Error::Internal(_)
128+
| Error::Client(_)
129+
| Error::PrimaryHandshakeTimeout
130+
| Error::NeedSnapshot) => {
131+
tracing::warn!("non-fatal replication error, retrying from last commit index: {e}");
132+
},
133+
Error::NoHandshake => {
134+
// not strictly necessary, but in case the handshake error goes uncaught,
135+
// we reset the client state.
136+
replicator.client_mut().reset_token();
137+
}
138+
Error::SnapshotPending => unreachable!(),
139+
}
140+
}
141+
});
142+
143+
let stats = make_stats(
144+
&db_path,
145+
&mut join_set,
146+
meta_store_handle.clone(),
147+
config.stats_sender.clone(),
148+
name.clone(),
149+
applied_frame_no_receiver.clone(),
150+
config.encryption_config.clone(),
151+
)
152+
.await?;
153+
154+
let connection_maker = MakeWriteProxyConn::new(
155+
db_path.clone(),
156+
config.extensions.clone(),
157+
channel.clone(),
158+
uri.clone(),
159+
stats.clone(),
160+
broadcaster,
161+
meta_store_handle.clone(),
162+
applied_frame_no_receiver,
163+
config.max_response_size,
164+
config.max_total_response_size,
165+
primary_current_replicatio_index,
166+
config.encryption_config.clone(),
167+
resolve_attach_path,
168+
config.make_wal_manager.clone(),
169+
)
170+
.await?
171+
.throttled(
172+
config.max_concurrent_connections.clone(),
173+
Some(DB_CREATE_TIMEOUT),
174+
config.max_total_response_size,
175+
config.max_concurrent_requests,
176+
);
177+
178+
Ok(Namespace {
179+
tasks: join_set,
180+
db: Database::Replica(ReplicaDatabase {
181+
connection_maker: Arc::new(connection_maker),
182+
}),
183+
name: name.clone(),
184+
stats,
185+
db_config_store: meta_store_handle,
186+
path: db_path.into(),
187+
})
188+
})
189+
}
190+
}

0 commit comments

Comments
 (0)