|
| 1 | +use std::sync::{atomic::AtomicBool, Arc}; |
| 2 | + |
| 3 | +use futures::prelude::Future; |
| 4 | +use tokio::task::JoinSet; |
| 5 | + |
| 6 | +use crate::database::{Database, SchemaDatabase}; |
| 7 | +use crate::namespace::meta_store::MetaStoreHandle; |
| 8 | +use crate::namespace::{ |
| 9 | + Namespace, NamespaceConfig, NamespaceName, NamespaceStore, |
| 10 | + ResetCb, ResolveNamespacePathFn, RestoreOption, |
| 11 | +}; |
| 12 | +use crate::namespace::broadcasters::BroadcasterHandle; |
| 13 | + |
| 14 | +use super::ConfigureNamespace; |
| 15 | + |
| 16 | +pub struct SchemaConfigurator; |
| 17 | + |
| 18 | +impl ConfigureNamespace for SchemaConfigurator { |
| 19 | + fn setup<'a>( |
| 20 | + &'a self, |
| 21 | + ns_config: &'a NamespaceConfig, |
| 22 | + db_config: MetaStoreHandle, |
| 23 | + restore_option: RestoreOption, |
| 24 | + name: &'a NamespaceName, |
| 25 | + _reset: ResetCb, |
| 26 | + resolve_attach_path: ResolveNamespacePathFn, |
| 27 | + _store: NamespaceStore, |
| 28 | + broadcaster: BroadcasterHandle, |
| 29 | + ) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> { |
| 30 | + Box::pin(async move { |
| 31 | + let mut join_set = JoinSet::new(); |
| 32 | + let db_path = ns_config.base_path.join("dbs").join(name.as_str()); |
| 33 | + |
| 34 | + tokio::fs::create_dir_all(&db_path).await?; |
| 35 | + |
| 36 | + let (connection_maker, wal_manager, stats) = Namespace::make_primary_connection_maker( |
| 37 | + ns_config, |
| 38 | + &db_config, |
| 39 | + &db_path, |
| 40 | + &name, |
| 41 | + restore_option, |
| 42 | + Arc::new(AtomicBool::new(false)), // this is always false for schema |
| 43 | + &mut join_set, |
| 44 | + resolve_attach_path, |
| 45 | + broadcaster, |
| 46 | + ) |
| 47 | + .await?; |
| 48 | + |
| 49 | + Ok(Namespace { |
| 50 | + db: Database::Schema(SchemaDatabase::new( |
| 51 | + ns_config.migration_scheduler.clone(), |
| 52 | + name.clone(), |
| 53 | + connection_maker, |
| 54 | + wal_manager, |
| 55 | + db_config.clone(), |
| 56 | + )), |
| 57 | + name: name.clone(), |
| 58 | + tasks: join_set, |
| 59 | + stats, |
| 60 | + db_config_store: db_config.clone(), |
| 61 | + path: db_path.into(), |
| 62 | + }) |
| 63 | + }) |
| 64 | + } |
| 65 | +} |
0 commit comments