Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions docs/REPLICATION_DESIGN.md

Large diffs are not rendered by default.

138 changes: 131 additions & 7 deletions src/replication/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! Challenge-response for claimed holders. Anti-outsourcing protection.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::logging::{debug, info, warn};
Expand All @@ -15,10 +15,13 @@ use crate::replication::protocol::{
compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
ReplicationMessageBody, ABSENT_KEY_DIGEST,
};
use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
use crate::replication::types::{
AuditFailureReason, FailureEvidence, PeerSyncRecord, RepairProofs,
};
use crate::storage::LmdbStorage;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use tokio::sync::RwLock;

// ---------------------------------------------------------------------------
// Audit tick result
Expand Down Expand Up @@ -61,13 +64,42 @@ pub enum AuditTickResult {
/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
/// `is_bootstrapping` is `true` — a node must not audit others while it
/// is still bootstrapping.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
#[allow(clippy::implicit_hasher)]
pub async fn audit_tick(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
is_bootstrapping: bool,
) -> AuditTickResult {
let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
audit_tick_with_repair_proofs(
p2p_node,
storage,
config,
sync_history,
&repair_proofs,
0,
is_bootstrapping,
)
.await
}

/// Execute one repair-proof-gated audit tick.
///
/// This is the production path used by the replication engine. The
/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
/// callers that have not adopted repair proofs remain conservative and do not
/// audit peers for unproven keys.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
pub async fn audit_tick_with_repair_proofs(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
current_sync_epoch: u64,
is_bootstrapping: bool,
) -> AuditTickResult {
// Invariant 19: never audit while still bootstrapping.
if is_bootstrapping {
Expand Down Expand Up @@ -119,17 +151,30 @@ pub async fn audit_tick(
.collect()
};

// Step 4: Filter to keys where the chosen peer is in the close group.
let mut peer_keys = Vec::new();
// Step 4: Filter to keys where the chosen peer is in the close group and
// this node has proof that it already sent the peer a repair hint for the
// specific key.
let mut sampled_key_groups = Vec::new();
for key in &sampled_keys {
let closest = dht
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == challenged_peer) {
peer_keys.push(*key);
let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
if close_peers.contains(&challenged_peer) {
sampled_key_groups.push((*key, close_peers));
}
}

let peer_keys = {
let mut proofs = repair_proofs.write().await;
mature_audit_keys_for_peer(
&challenged_peer,
sampled_key_groups,
&mut proofs,
current_sync_epoch,
)
};

if peer_keys.is_empty() {
return AuditTickResult::Idle;
}
Expand Down Expand Up @@ -299,6 +344,22 @@ fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<P
.collect()
}

fn mature_audit_keys_for_peer(
challenged_peer: &PeerId,
sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
repair_proofs: &mut RepairProofs,
current_sync_epoch: u64,
) -> Vec<XorName> {
sampled_key_groups
.into_iter()
.filter_map(|(key, close_peers)| {
repair_proofs
.has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch)
.then_some(key)
})
.collect()
}

// ---------------------------------------------------------------------------
// Digest verification
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1133,6 +1194,69 @@ mod tests {
);
}

#[test]
fn audit_key_filter_accepts_only_mature_current_snapshot_repair_proofs() {
const HINT_EPOCH: u64 = 7;
const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
const CHALLENGED_PEER_BYTE: u8 = 0xA1;
const OTHER_PEER_BYTE: u8 = 0xA2;
const NEW_PEER_BYTE: u8 = 0xA3;
const MATURE_KEY_BYTE: u8 = 0xB1;
const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
const STALE_SNAPSHOT_KEY_BYTE: u8 = 0xB4;
const XOR_NAME_LEN: usize = 32;

let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
let stale_snapshot_key = [STALE_SNAPSHOT_KEY_BYTE; XOR_NAME_LEN];
let close_group = HashSet::from([challenged_peer, other_peer]);
let changed_close_group = HashSet::from([challenged_peer, new_peer]);
let mut repair_proofs = RepairProofs::new();

assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
mature_key,
&close_group,
HINT_EPOCH,
));
assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
same_epoch_key,
&close_group,
CURRENT_EPOCH,
));
assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
stale_snapshot_key,
&close_group,
HINT_EPOCH,
));

let sampled_key_groups = vec![
(mature_key, close_group.clone()),
(same_epoch_key, close_group.clone()),
(missing_proof_key, close_group),
(stale_snapshot_key, changed_close_group),
];
let peer_keys = mature_audit_keys_for_peer(
&challenged_peer,
sampled_key_groups,
&mut repair_proofs,
CURRENT_EPOCH,
);

assert_eq!(
peer_keys,
vec![mature_key],
"only mature proofs for the current close-group snapshot should become audit keys"
);
}

// -- Audit response must match key count --------------------------------------

#[tokio::test]
Expand Down
Loading
Loading