diff --git a/docs/REPLICATION_DESIGN.md b/docs/REPLICATION_DESIGN.md index cc01556..1f45839 100644 --- a/docs/REPLICATION_DESIGN.md +++ b/docs/REPLICATION_DESIGN.md @@ -49,7 +49,8 @@ Primary goal: validate correctness, safety, and liveness of replication logic be - `QuorumTargets(K)`: up to `CLOSE_GROUP_SIZE` nearest known peers for key `K` in `LocalRT(self)`, excluding `self`; used as the candidate peer set for presence checks. - `QuorumNeeded(K)`: effective presence confirmation count for key `K`, defined as `min(QUORUM_THRESHOLD, floor(|QuorumTargets(K)|/2)+1)`. - `BootstrapDrained(N)`: bootstrap-completion gate for node `N`; true only when peer discovery closest to `N`'s own address has populated `LocalRT(N)`, bootstrap peer requests are finished (response or timeout), and bootstrap work queues are empty (`PendingVerify`, `FetchQueue`, `InFlightFetch` for bootstrap-discovered keys). -- `RepairOpportunity(P, KSet)`: evidence that peer `P` has previously received replication hints/offers for keys in `KSet` and had at least one subsequent neighbor-sync cycle to repair before audit evaluation. +- `RepairProof(P, K)`: local evidence that node `N` sent peer `P` a replica repair hint for key `K` while `P` was in `N`'s self-inclusive `CloseGroup(K)`. The proof records the close-group snapshot and local neighbor-sync cycle epoch at hint time; if the current self-inclusive close group for `K` differs, all proofs for `K` are invalidated. If `P` leaves `LocalRT(self)`, all proofs for `P` are invalidated. Peers require a fresh hint after re-entry. +- `RepairOpportunity(P, KSet)`: for every key in `KSet`, a mature `RepairProof(P, K)` exists and peer `P` has had at least one subsequent local neighbor-sync cycle epoch to repair before audit evaluation. Repair proofs are retained per key only for one close-group snapshot, bounding memory to `local_key_count * CLOSE_GROUP_SIZE`. - `BootstrapClaimFirstSeen(N, P)`: timestamp when node `N` first observed peer `P` responding with a bootstrapping claim to a sync, audit, or prune-confirmation request. Each peer gets at most one bootstrap-claim grace window. When `P` stops claiming bootstrap status, the active claim is cleared but the first-seen history is retained; any later bootstrap claim by the same peer is `BootstrapClaimAbuse`. - `TrustEngine`: local trust subsystem (EMA-based response-rate scoring with time decay) that consumes replication evidence events via `AdaptiveDHT::report_trust_event`, updates peer trust scores, and triggers peer eviction when scores fall below `block_threshold`. Consumer-reported events use `TrustEvent::ApplicationSuccess(weight)` / `TrustEvent::ApplicationFailure(weight)` with weight clamped to `MAX_CONSUMER_WEIGHT` (5.0). @@ -103,9 +104,9 @@ Parameter safety constraints (MUST hold): 17. A `PaidNotify(K)` only whitelists key `K` after receiver-side proof verification succeeds; sender assertions never whitelist by themselves. 18. Neighbor-sync paid hints are non-authoritative and carry no PoP; receivers MUST only whitelist by paid-list majority verification (`>= ConfirmNeeded(K)`) or close-group replica majority (Section 7.2 rule 4), never by hint claims alone. Paid-hint-only processing MAY enqueue record fetch only after authorization succeeds and `IsResponsible(self, K)` is true; otherwise it updates `PaidForList(self)` only. 19. Storage-proof audits start only after `BootstrapDrained(self)` becomes true. -20. Storage-proof audits target only peers derived from closest-peer lookups for sampled local keys, filtered through local authenticated routing state (`LocalRT(self)`), and further filtered to peers for which `RepairOpportunity` holds; random global peers and never-synced peers are never audited. +20. Storage-proof audits target only peers derived from closest-peer lookups for sampled local keys, filtered through local authenticated routing state (`LocalRT(self)`), and further filtered to `(peer, key)` pairs for which mature `RepairProof` and `RepairOpportunity` hold; random global peers, never-synced peers, peers without a key-specific repair hint proof, peers whose key proof predates a close-group change, and peers whose proof was cleared by routing-table removal are never audited for that key. 21. Verification-request batching is mandatory for unknown-key neighbor-sync verification and preserves per-key quorum semantics: each key receives explicit per-key evidence, and missing/timeout evidence is unresolved per key. -22. On every `NeighborSyncCycleComplete(self)`, node MUST run a prune pass using current `SelfInclusiveRT(self)`: for stored records where `IsResponsible(self, K)` is false, record `RecordOutOfRangeFirstSeen` if not already set and delete only when `now - RecordOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION` and every currently-known `CloseGroup(K)` member returns a positive nonce-bound audit proof for the record. If any close-group peer does not prove storage, pruning is deferred and the retained local record continues participating in normal neighbor-sync repair. Prune-confirmation failures emit trust penalties after fresh responsibility confirmation; first-time bootstrap claims use the same one-time `BOOTSTRAP_CLAIM_GRACE_PERIOD` tracking before abuse penalties apply. Clear `RecordOutOfRangeFirstSeen` when back in range. For `PaidForList` entries where `self ∉ PaidCloseGroup(K)`, record `PaidOutOfRangeFirstSeen` if not already set and delete only when `now - PaidOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION`; clear `PaidOutOfRangeFirstSeen` when back in range. The two timestamps are independent. +22. On every `NeighborSyncCycleComplete(self)`, node MUST run a prune pass using current `SelfInclusiveRT(self)`: for stored records where `IsResponsible(self, K)` is false, record `RecordOutOfRangeFirstSeen` if not already set and delete only when `now - RecordOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION`, every currently-known `CloseGroup(K)` member has a mature `RepairProof(peer, K)` tied to the current close-group snapshot, and every currently-known `CloseGroup(K)` member returns a positive nonce-bound audit proof for the record. If any close-group peer lacks mature repair proof or does not prove storage, pruning is deferred and the retained local record continues participating in normal neighbor-sync repair. Prune-confirmation failures emit trust penalties after fresh responsibility confirmation and key-specific mature repair proof; first-time bootstrap claims use the same one-time `BOOTSTRAP_CLAIM_GRACE_PERIOD` tracking before abuse penalties apply. Clear `RecordOutOfRangeFirstSeen` when back in range. For `PaidForList` entries where `self ∉ PaidCloseGroup(K)`, record `PaidOutOfRangeFirstSeen` if not already set and delete only when `now - PaidOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION`; clear `PaidOutOfRangeFirstSeen` when back in range. The two timestamps are independent. 23. Peers claiming bootstrap status are skipped for sync and audit without penalty for up to `BOOTSTRAP_CLAIM_GRACE_PERIOD` from first observation. After the grace period, each continued bootstrap claim emits `BootstrapClaimAbuse` evidence to `TrustEngine` (via `report_trust_event` with `ApplicationFailure(weight)`). If a peer stops claiming bootstrap and later claims bootstrap again, the repeated claim emits `BootstrapClaimAbuse` immediately; the grace period is not reset. 24. Audit trust-penalty signals require responsibility confirmation: on audit failure, challenger MUST perform fresh local RT closest-peer lookups for each challenged key and only penalize the peer for keys where it is confirmed responsible. @@ -361,7 +362,7 @@ Post-cycle responsibility pruning (triggered by `NeighborSyncCycleComplete(self) 1. For each locally stored key `K`, recompute `IsResponsible(self, K)` using current `SelfInclusiveRT(self)`: a. If in range: clear `RecordOutOfRangeFirstSeen(self, K)` (set to `None`). - b. If out of range: if `RecordOutOfRangeFirstSeen(self, K)` is `None`, set it to `now`. Once the hysteresis duration has elapsed, request a nonce-bound audit proof from every current `CloseGroup(K)` peer, subject to a bounded per-pass prune-confirmation budget. If any peer does not prove storage, defer pruning and emit trust penalties after fresh responsibility confirmation; first-time bootstrap claims are penalized only after the bootstrap grace period, while repeated bootstrap claims are penalized immediately. Delete the local record only when `now - RecordOutOfRangeFirstSeen(self, K) >= PRUNE_HYSTERESIS_DURATION` and every current close-group peer proves storage for `K`. + b. If out of range: if `RecordOutOfRangeFirstSeen(self, K)` is `None`, set it to `now`. Once the hysteresis duration has elapsed, request a nonce-bound audit proof only from current `CloseGroup(K)` peers for which a mature `RepairProof(peer, K)` exists for the current close-group snapshot, subject to a bounded per-pass prune-confirmation budget. If any current close-group peer lacks mature repair proof or does not prove storage, defer pruning and emit trust penalties only after fresh responsibility confirmation plus key-specific mature repair proof; first-time bootstrap claims are penalized only after the bootstrap grace period, while repeated bootstrap claims are penalized immediately. Delete the local record only when `now - RecordOutOfRangeFirstSeen(self, K) >= PRUNE_HYSTERESIS_DURATION`, every current close-group peer has mature repair proof for `K`, and every current close-group peer proves storage for `K`. 2. For each key `K` in `PaidForList(self)`, recompute `PaidCloseGroup(K)` membership using current `SelfInclusiveRT(self)`: a. If `self ∈ PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen(self, K)` (set to `None`). b. If `self ∉ PaidCloseGroup(K)`: if `PaidOutOfRangeFirstSeen(self, K)` is `None`, set it to `now`. Delete the entry only when `now - PaidOutOfRangeFirstSeen(self, K) >= PRUNE_HYSTERESIS_DURATION`. @@ -431,7 +432,7 @@ Rules: 3. A `ReplicationFailure` is emitted per peer per failed fetch attempt, not per key. If a key requires two retries from two different peers before succeeding on the third, each of the two failed peers emits one failure event. 4. Replication SHOULD mark fetch-failure evidence as stale/low-confidence if the key later succeeds via an alternate verified source. 5. On audit failure, replication MUST first run the responsibility confirmation (Section 15 step 9). If the confirmed failure set is non-empty, emit `AuditFailure` evidence with `challenge_id`, `challenged_peer_id`, confirmed failure keys, and failure reason. If the confirmed failure set is empty, no `AuditFailure` is emitted. -6. Replication MUST emit a trust-penalty signal to `TrustEngine` (via `report_trust_event` with `ApplicationFailure(weight)`) for audit failure only when both conditions hold: the confirmed failure set from responsibility confirmation is non-empty (Section 15 step 9d) AND `RepairOpportunity(challenged_peer_id, confirmed_failure_keys)` is true. +6. Replication MUST emit a trust-penalty signal to `TrustEngine` (via `report_trust_event` with `ApplicationFailure(weight)`) for audit failure only when both conditions hold: the confirmed failure set from responsibility confirmation is non-empty (Section 15 step 9d) AND `RepairOpportunity(challenged_peer_id, confirmed_failure_keys)` is true for the specific failed keys using mature proofs for the current close-group snapshot. 7. On bootstrap claim past grace period, replication MUST emit `BootstrapClaimAbuse` evidence with `peer_id` and `BootstrapClaimFirstSeen` timestamp. Evidence is emitted on each sync or audit attempt where the peer claims bootstrapping after `BOOTSTRAP_CLAIM_GRACE_PERIOD`. A repeated bootstrap claim after the peer previously stopped claiming bootstrap also MUST emit `BootstrapClaimAbuse` immediately. 8. When a peer that previously claimed bootstrap status stops claiming it (responds normally to sync or audit), node MUST clear the active bootstrap claim but MUST retain the first-seen history. The peer MUST NOT receive a second bootstrap grace window. 9. Final trust-score updates and any eventual peer eviction are determined by `TrustEngine` / `AdaptiveDHT`, not by replication logic. @@ -441,9 +442,9 @@ Rules: Challenge-response for claimed holders: 1. Challenger creates unique challenge id + nonce. -2. Challenger selects one peer uniformly at random from peers with `RepairOpportunity` as `challenged_peer_id`. If no eligible peers exist, the audit tick is idle. +2. Challenger selects one peer uniformly at random from peers with peer-level sync history sufficient for `RepairOpportunity` as `challenged_peer_id`. If no eligible peers exist, the audit tick is idle. 3. Challenger samples `SeedKeys` uniformly at random from locally stored record keys, with `|SeedKeys| = max(floor(sqrt(local_store_key_count)), 1)` (capped at `local_store_key_count`). If local store is empty, the audit tick is idle. -4. For each `K` in `SeedKeys`, challenger checks whether `challenged_peer_id` appears in the `CLOSE_GROUP_SIZE` closest peers for `K` via local RT lookup. Keys where the peer is not responsible are discarded. The remaining keys form `PeerKeySet(challenged_peer_id)`. +4. For each `K` in `SeedKeys`, challenger checks whether `challenged_peer_id` appears in the `CLOSE_GROUP_SIZE` closest peers for `K` via local RT lookup and whether a mature `RepairProof(challenged_peer_id, K)` exists for that exact close-group snapshot. If the current snapshot differs from the snapshot recorded with the proof, all local proofs for `K` are invalidated. Keys where the peer is not responsible or where no key-specific mature repair proof exists are discarded. The remaining keys form `PeerKeySet(challenged_peer_id)`. 5. If `PeerKeySet` is empty, the audit tick is idle. 6. Challenger sends `challenged_peer_id` an ordered challenge key set equal to `PeerKeySet(challenged_peer_id)`. 7. Target responds with either per-key `AuditKeyDigest` values or a bootstrapping claim: @@ -573,7 +574,7 @@ Each scenario should assert exact expected outcomes and state transitions. 18. Invalid runtime config: - Node rejects configs violating parameter safety constraints. 19. Audit per-key digest mismatch with confirmed responsibility: -- Peer `P` is challenged on keys `{K1, K2, K3}`. `P` returns per-key digests: `K1` matches, `K2` mismatches, `K3` absent. Challenger runs responsibility confirmation for failed keys `{K2, K3}`: `P` appears in fresh lookup for `K2` but not `K3`. `AuditFailure` is emitted for `{K2}` only. Trust-penalty signal is emitted only when `RepairOpportunity(P, {K2})` is also true. +- Peer `P` is challenged on keys `{K1, K2, K3}`. `P` returns per-key digests: `K1` matches, `K2` mismatches, `K3` absent. Challenger runs responsibility confirmation for failed keys `{K2, K3}`: `P` appears in fresh lookup for `K2` but not `K3`. `AuditFailure` is emitted for `{K2}` only. Trust-penalty signal is emitted only when `RepairOpportunity(P, {K2})` is also true with mature proof for the current close-group snapshot. 20. Paid-list local hit: - Admitted unknown replica key with local paid-list entry bypasses presence quorum and enters fetch pipeline. 21. Paid-list majority confirmation: @@ -607,7 +608,7 @@ Each scenario should assert exact expected outcomes and state transitions. 35. Neighbor-sync round-robin batch selection with cooldown skip: - With more than `NEIGHBOR_SYNC_PEER_COUNT` eligible peers, consecutive rounds scan forward from cursor, skip and remove cooldown peers, and sync the next batch of up to `NEIGHBOR_SYNC_PEER_COUNT` non-cooldown peers. Cycle completes when all snapshot peers have been synced, skipped (cooldown), or removed (unreachable). 36. Post-cycle responsibility pruning with time-based hysteresis: -- When a full neighbor-sync round-robin cycle completes, node runs one prune pass using current `SelfInclusiveRT(self)` (`LocalRT(self) ∪ {self}`): stored keys with `IsResponsible(self, K)=false` have `RecordOutOfRangeFirstSeen` recorded (if not already set) but are deleted only when `now - RecordOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION` and every current close-group peer returns a positive nonce-bound audit proof for the key. Missing or failed proofs defer pruning and emit trust penalties after fresh responsibility confirmation; first-time bootstrap claims are penalized only after the bootstrap grace period, while repeated bootstrap claims are penalized immediately. Prune-confirmation work is bounded per pass. Keys that are in range have their `RecordOutOfRangeFirstSeen` cleared. Same hysteresis timing applies independently to `PaidForList` entries where `self ∉ PaidCloseGroup(K)` using `PaidOutOfRangeFirstSeen`, but paid-list pruning does not require remote storage confirmation. +- When a full neighbor-sync round-robin cycle completes, node runs one prune pass using current `SelfInclusiveRT(self)` (`LocalRT(self) ∪ {self}`): stored keys with `IsResponsible(self, K)=false` have `RecordOutOfRangeFirstSeen` recorded (if not already set) but are deleted only when `now - RecordOutOfRangeFirstSeen >= PRUNE_HYSTERESIS_DURATION`, every current close-group peer has mature repair proof for the current close-group snapshot, and every current close-group peer returns a positive nonce-bound audit proof for the key. Missing or failed proofs defer pruning and emit trust penalties after fresh responsibility confirmation; first-time bootstrap claims are penalized only after the bootstrap grace period, while repeated bootstrap claims are penalized immediately. Prune-confirmation work is bounded per pass. Keys that are in range have their `RecordOutOfRangeFirstSeen` cleared. Same hysteresis timing applies independently to `PaidForList` entries where `self ∉ PaidCloseGroup(K)` using `PaidOutOfRangeFirstSeen`, but paid-list pruning does not require remote storage confirmation. 37. Non-`LocalRT` inbound sync behavior: - If a peer opens sync while not in receiver `LocalRT(self)`, receiver may still send hints to that peer, but receiver drops all inbound replica/paid hints from that peer. 38. Neighbor-sync snapshot stability under peer join: @@ -635,7 +636,7 @@ Each scenario should assert exact expected outcomes and state transitions. 49. Bootstrap active claim cleared on normal response, with history retained: - Peer `P` previously claimed bootstrapping. `P` later responds normally to a sync or audit request. Node clears the active bootstrap claim but retains `BootstrapClaimFirstSeen(self, P)` history. If `P` later claims bootstrapping again, node emits `BootstrapClaimAbuse` immediately instead of granting a second grace period. 50. Prune hysteresis prevents premature deletion: -- Key `K` goes out of range at time `T`. `RecordOutOfRangeFirstSeen(self, K)` is set to `T`. Key is NOT deleted. At `T + 24h` (less than `PRUNE_HYSTERESIS_DURATION`), key is still retained. At `T + 3 days` (`>= PRUNE_HYSTERESIS_DURATION`), key is eligible for deletion on the next prune pass only if all current close-group peers return positive nonce-bound audit proofs. +- Key `K` goes out of range at time `T`. `RecordOutOfRangeFirstSeen(self, K)` is set to `T`. Key is NOT deleted. At `T + 24h` (less than `PRUNE_HYSTERESIS_DURATION`), key is still retained. At `T + 3 days` (`>= PRUNE_HYSTERESIS_DURATION`), key is eligible for deletion on the next prune pass only if all current close-group peers have mature repair proof for the current close-group snapshot and return positive nonce-bound audit proofs. 51. Prune hysteresis timestamp reset on partition heal: - Key `K` goes out of range at time `T`. `RecordOutOfRangeFirstSeen(self, K)` is set to `T`. At `T + 4h`, partition heals, peers return, `K` is back in range. `RecordOutOfRangeFirstSeen` is cleared. Key is retained. If `K` later goes out of range again, the clock restarts from zero. 52. Prune hysteresis applies to paid-list entries: @@ -647,7 +648,7 @@ Each scenario should assert exact expected outcomes and state transitions. 55. Audit per-key failure with no confirmed responsibility: - Peer `P` is challenged on `{K1, K2}`. Per-key digests: both mismatch. Responsibility confirmation: `P` does not appear in fresh lookup results for either key. Entire audit failure is discarded — no `AuditFailure` evidence emitted, no trust-penalty signal. 56. Audit skips never-synced peer: -- Peer `P` appears in closest-peer lookup results for sampled keys and is in `LocalRT(self)`, but `RepairOpportunity(P, _)` is false (no prior sync). `P` is removed from `CandidatePeersRT` before `PeerKeySet` construction. If no other eligible peers remain, audit tick is idle. No challenge is sent to `P`, no network resources consumed. +- Peer `P` appears in closest-peer lookup results for sampled keys and is in `LocalRT(self)`, but `RepairOpportunity(P, _)` is false (no prior sync, no key-specific mature `RepairProof(P, K)` for sampled keys, or the proof predates a close-group snapshot change). `P` is removed from `CandidatePeersRT` before `PeerKeySet` construction. If no other eligible peers remain, audit tick is idle. No challenge is sent to `P`, no network resources consumed. ## 19. Acceptance Criteria for This Design diff --git a/src/replication/audit.rs b/src/replication/audit.rs index 3c2bcc6..af4584f 100644 --- a/src/replication/audit.rs +++ b/src/replication/audit.rs @@ -2,7 +2,7 @@ //! //! Challenge-response for claimed holders. Anti-outsourcing protection. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::logging::{debug, info, warn}; @@ -15,10 +15,13 @@ use crate::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, ReplicationMessageBody, ABSENT_KEY_DIGEST, }; -use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord}; +use crate::replication::types::{ + AuditFailureReason, FailureEvidence, PeerSyncRecord, RepairProofs, +}; use crate::storage::LmdbStorage; use saorsa_core::identity::PeerId; use saorsa_core::P2PNode; +use tokio::sync::RwLock; // --------------------------------------------------------------------------- // Audit tick result @@ -61,13 +64,42 @@ pub enum AuditTickResult { /// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if /// `is_bootstrapping` is `true` — a node must not audit others while it /// is still bootstrapping. -#[allow(clippy::implicit_hasher, clippy::too_many_lines)] +#[allow(clippy::implicit_hasher)] pub async fn audit_tick( p2p_node: &Arc, storage: &Arc, config: &ReplicationConfig, sync_history: &HashMap, is_bootstrapping: bool, +) -> AuditTickResult { + let repair_proofs = Arc::new(RwLock::new(RepairProofs::new())); + audit_tick_with_repair_proofs( + p2p_node, + storage, + config, + sync_history, + &repair_proofs, + 0, + is_bootstrapping, + ) + .await +} + +/// Execute one repair-proof-gated audit tick. +/// +/// This is the production path used by the replication engine. The +/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct +/// callers that have not adopted repair proofs remain conservative and do not +/// audit peers for unproven keys. +#[allow(clippy::implicit_hasher, clippy::too_many_lines)] +pub async fn audit_tick_with_repair_proofs( + p2p_node: &Arc, + storage: &Arc, + config: &ReplicationConfig, + sync_history: &HashMap, + repair_proofs: &Arc>, + current_sync_epoch: u64, + is_bootstrapping: bool, ) -> AuditTickResult { // Invariant 19: never audit while still bootstrapping. if is_bootstrapping { @@ -119,17 +151,30 @@ pub async fn audit_tick( .collect() }; - // Step 4: Filter to keys where the chosen peer is in the close group. - let mut peer_keys = Vec::new(); + // Step 4: Filter to keys where the chosen peer is in the close group and + // this node has proof that it already sent the peer a repair hint for the + // specific key. + let mut sampled_key_groups = Vec::new(); for key in &sampled_keys { let closest = dht .find_closest_nodes_local_with_self(key, config.close_group_size) .await; - if closest.iter().any(|n| n.peer_id == challenged_peer) { - peer_keys.push(*key); + let close_peers: HashSet = closest.iter().map(|node| node.peer_id).collect(); + if close_peers.contains(&challenged_peer) { + sampled_key_groups.push((*key, close_peers)); } } + let peer_keys = { + let mut proofs = repair_proofs.write().await; + mature_audit_keys_for_peer( + &challenged_peer, + sampled_key_groups, + &mut proofs, + current_sync_epoch, + ) + }; + if peer_keys.is_empty() { return AuditTickResult::Idle; } @@ -299,6 +344,22 @@ fn eligible_audit_peers(sync_history: &HashMap) -> Vec

)>, + repair_proofs: &mut RepairProofs, + current_sync_epoch: u64, +) -> Vec { + sampled_key_groups + .into_iter() + .filter_map(|(key, close_peers)| { + repair_proofs + .has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch) + .then_some(key) + }) + .collect() +} + // --------------------------------------------------------------------------- // Digest verification // --------------------------------------------------------------------------- @@ -1133,6 +1194,79 @@ mod tests { ); } + #[test] + fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + const CHALLENGED_PEER_BYTE: u8 = 0xA1; + const OTHER_PEER_BYTE: u8 = 0xA2; + const NEW_PEER_BYTE: u8 = 0xA3; + const MATURE_KEY_BYTE: u8 = 0xB1; + const SAME_EPOCH_KEY_BYTE: u8 = 0xB2; + const MISSING_PROOF_KEY_BYTE: u8 = 0xB3; + const STABLE_CHURN_KEY_BYTE: u8 = 0xB4; + const EVICTED_KEY_BYTE: u8 = 0xB5; + const XOR_NAME_LEN: usize = 32; + + let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]); + let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]); + let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]); + let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN]; + let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN]; + let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN]; + let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN]; + let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN]; + let close_group = HashSet::from([challenged_peer, other_peer]); + let changed_close_group = HashSet::from([challenged_peer, new_peer]); + let evicted_close_group = HashSet::from([other_peer, new_peer]); + let mut repair_proofs = RepairProofs::new(); + + assert!(repair_proofs.record_replica_hint_sent( + challenged_peer, + mature_key, + &close_group, + HINT_EPOCH, + )); + assert!(repair_proofs.record_replica_hint_sent( + challenged_peer, + same_epoch_key, + &close_group, + CURRENT_EPOCH, + )); + assert!(repair_proofs.record_replica_hint_sent( + challenged_peer, + stable_churn_key, + &close_group, + HINT_EPOCH, + )); + assert!(repair_proofs.record_replica_hint_sent( + challenged_peer, + evicted_key, + &close_group, + HINT_EPOCH, + )); + + let sampled_key_groups = vec![ + (mature_key, close_group.clone()), + (same_epoch_key, close_group.clone()), + (missing_proof_key, close_group.clone()), + (stable_churn_key, changed_close_group), + (evicted_key, evicted_close_group), + ]; + let peer_keys = mature_audit_keys_for_peer( + &challenged_peer, + sampled_key_groups, + &mut repair_proofs, + CURRENT_EPOCH, + ); + + assert_eq!( + peer_keys, + vec![mature_key, stable_churn_key], + "mature proofs for stable close-group peers should become audit keys, while same-epoch, missing, and evicted-peer proofs should not" + ); + } + // -- Audit response must match key count -------------------------------------- #[tokio::test] diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 9e4e03f..996de48 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -59,7 +59,7 @@ use crate::replication::quorum::KeyVerificationOutcome; use crate::replication::scheduling::ReplicationQueues; use crate::replication::types::{ AuditFailureReason, BootstrapClaimObservation, BootstrapState, FailureEvidence, HintPipeline, - NeighborSyncState, PeerSyncRecord, VerificationEntry, VerificationState, + NeighborSyncState, PeerSyncRecord, RepairProofs, VerificationEntry, VerificationState, }; use crate::storage::LmdbStorage; use saorsa_core::identity::PeerId; @@ -129,6 +129,10 @@ pub struct ReplicationEngine { /// are lightweight (`PeerSyncRecord` is two fields) and peer IDs are /// naturally bounded by the routing table's k-bucket capacity. sync_history: Arc>>, + /// Completed local neighbor-sync cycle epoch for proof maturity. + sync_cycle_epoch: Arc>, + /// Per-key repair proof tracking for audit eligibility. + repair_proofs: Arc>, /// Bootstrap state tracking. bootstrap_state: Arc>, /// Whether this node is currently bootstrapping. @@ -187,6 +191,8 @@ impl ReplicationEngine { queues: Arc::new(RwLock::new(ReplicationQueues::new())), sync_state: Arc::new(RwLock::new(initial_neighbors)), sync_history: Arc::new(RwLock::new(HashMap::new())), + sync_cycle_epoch: Arc::new(RwLock::new(0)), + repair_proofs: Arc::new(RwLock::new(RepairProofs::new())), bootstrap_state: Arc::new(RwLock::new(BootstrapState::new())), is_bootstrapping: Arc::new(RwLock::new(true)), sync_trigger: Arc::new(Notify::new()), @@ -358,6 +364,8 @@ impl ReplicationEngine { let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_state = Arc::clone(&self.bootstrap_state); let sync_history = Arc::clone(&self.sync_history); + let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); + let repair_proofs = Arc::clone(&self.repair_proofs); let sync_trigger = Arc::clone(&self.sync_trigger); let handle = tokio::spawn(async move { @@ -399,6 +407,8 @@ impl ReplicationEngine { &is_bootstrapping, &bootstrap_state, &sync_history, + &sync_cycle_epoch, + &repair_proofs, rr_message_id.as_deref(), ).await { Ok(()) => {} @@ -420,11 +430,17 @@ impl ReplicationEngine { // PeerDisconnected event against the close group. dht_event = dht_events.recv() => { let Ok(dht_event) = dht_event else { continue }; - if let DhtNetworkEvent::KClosestPeersChanged { .. } = dht_event { - debug!( - "K-closest peers changed, triggering early neighbor sync" - ); - sync_trigger.notify_one(); + match dht_event { + DhtNetworkEvent::KClosestPeersChanged { .. } => { + debug!( + "K-closest peers changed, triggering early neighbor sync" + ); + sync_trigger.notify_one(); + } + DhtNetworkEvent::PeerRemoved { peer_id } => { + repair_proofs.write().await.remove_peer(&peer_id); + } + _ => {} } } } @@ -443,6 +459,8 @@ impl ReplicationEngine { let shutdown = self.shutdown.clone(); let sync_state = Arc::clone(&self.sync_state); let sync_history = Arc::clone(&self.sync_history); + let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); + let repair_proofs = Arc::clone(&self.repair_proofs); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_state = Arc::clone(&self.bootstrap_state); let sync_trigger = Arc::clone(&self.sync_trigger); @@ -470,6 +488,8 @@ impl ReplicationEngine { &config, &sync_state, &sync_history, + &sync_cycle_epoch, + &repair_proofs, &is_bootstrapping, &bootstrap_state, ) => {} @@ -508,6 +528,8 @@ impl ReplicationEngine { let config = Arc::clone(&self.config); let shutdown = self.shutdown.clone(); let sync_history = Arc::clone(&self.sync_history); + let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); + let repair_proofs = Arc::clone(&self.repair_proofs); let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let sync_state = Arc::clone(&self.sync_state); @@ -532,7 +554,17 @@ impl ReplicationEngine { let bootstrapping = *is_bootstrapping.read().await; let result = { let history = sync_history.read().await; - audit::audit_tick(&p2p, &storage, &config, &history, bootstrapping).await + let current_sync_epoch = *sync_cycle_epoch.read().await; + audit::audit_tick_with_repair_proofs( + &p2p, + &storage, + &config, + &history, + &repair_proofs, + current_sync_epoch, + bootstrapping, + ) + .await }; handle_audit_result(&result, &p2p, &sync_state, &config).await; } @@ -546,11 +578,14 @@ impl ReplicationEngine { let bootstrapping = *is_bootstrapping.read().await; let result = { let history = sync_history.read().await; - audit::audit_tick( + let current_sync_epoch = *sync_cycle_epoch.read().await; + audit::audit_tick_with_repair_proofs( &p2p, &storage, &config, &history, + &repair_proofs, + current_sync_epoch, bootstrapping, ) .await @@ -777,6 +812,7 @@ impl ReplicationEngine { /// After the gate, finds close neighbors, syncs with each in /// round-robin batches, admits returned hints into the verification /// pipeline, and tracks discovered keys for bootstrap drain detection. + #[allow(clippy::too_many_lines)] fn start_bootstrap_sync( &mut self, dht_events: tokio::sync::broadcast::Receiver, @@ -790,6 +826,8 @@ impl ReplicationEngine { let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_state = Arc::clone(&self.bootstrap_state); let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); + let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); + let repair_proofs = Arc::clone(&self.repair_proofs); let handle = tokio::spawn(async move { // Wait for DHT bootstrap to complete before snapshotting @@ -837,7 +875,7 @@ impl ReplicationEngine { bootstrap::increment_pending_requests(&bootstrap_state, 1).await; - let response = neighbor_sync::sync_with_peer( + let outcome = neighbor_sync::sync_with_peer_with_outcome( peer, &p2p, &storage, @@ -849,14 +887,21 @@ impl ReplicationEngine { bootstrap::decrement_pending_requests(&bootstrap_state, 1).await; - if let Some(resp) = response { - if !resp.bootstrapping { + if let Some(outcome) = outcome { + if !outcome.response.bootstrapping { + record_sent_replica_hints( + peer, + &outcome.sent_replica_hints, + &repair_proofs, + &sync_cycle_epoch, + ) + .await; // Admit hints into verification pipeline. let outcome = admit_and_queue_hints( &self_id, peer, - &resp.replica_hints, - &resp.paid_hints, + &outcome.response.replica_hints, + &outcome.response.paid_hints, &p2p, &config, &storage, @@ -924,6 +969,8 @@ async fn handle_replication_message( is_bootstrapping: &Arc>, bootstrap_state: &Arc>, sync_history: &Arc>>, + sync_cycle_epoch: &Arc>, + repair_proofs: &Arc>, rr_message_id: Option<&str>, ) -> Result<()> { let msg = ReplicationMessage::decode(data) @@ -968,6 +1015,8 @@ async fn handle_replication_message( bootstrapping, bootstrap_state, sync_history, + sync_cycle_epoch, + repair_proofs, msg.request_id, rr_message_id, ) @@ -1263,6 +1312,8 @@ async fn handle_neighbor_sync_request( is_bootstrapping: bool, bootstrap_state: &Arc>, sync_history: &Arc>>, + sync_cycle_epoch: &Arc>, + repair_proofs: &Arc>, request_id: u64, rr_message_id: Option<&str>, ) -> Result<()> { @@ -1275,19 +1326,20 @@ async fn handle_neighbor_sync_request( // bootstrap replication for newly-joined nodes with 0 stored chunks. // Build response (outbound hints). - let (response, sender_in_rt) = neighbor_sync::handle_sync_request( - source, - request, - p2p_node, - storage, - paid_list, - config, - is_bootstrapping, - ) - .await; + let (response, sent_replica_hints, sender_in_rt) = + neighbor_sync::handle_sync_request_with_proofs( + source, + request, + p2p_node, + storage, + paid_list, + config, + is_bootstrapping, + ) + .await; // Send response. - send_replication_response( + let response_sent = send_replication_response_checked( source, p2p_node, request_id, @@ -1301,7 +1353,8 @@ async fn handle_neighbor_sync_request( return Ok(()); } - // Update sync history for this peer. + // Update sync history for this peer before recording repair proofs so a + // same-tick audit cannot combine a fresh key proof with stale peer maturity. { let mut history = sync_history.write().await; let record = history.entry(*source).or_insert(PeerSyncRecord { @@ -1312,6 +1365,11 @@ async fn handle_neighbor_sync_request( record.cycles_since_sync = 0; } + if response_sent && !request.bootstrapping { + record_sent_replica_hints(source, &sent_replica_hints, repair_proofs, sync_cycle_epoch) + .await; + } + // Admit inbound hints and queue for verification. let outcome = admit_and_queue_hints( &self_id, @@ -1472,25 +1530,44 @@ async fn handle_audit_challenge_msg( // Message sending helper // --------------------------------------------------------------------------- -/// Send a replication response message. Fire-and-forget: logs errors but -/// does not propagate them. +/// Send a replication response message as a best-effort reply. +/// +/// Encode and send failures are logged by the checked helper. Most response +/// paths do not need to branch on send success, so this wrapper keeps those +/// call sites explicit about their best-effort behavior. +async fn send_replication_response( + peer: &PeerId, + p2p_node: &Arc, + request_id: u64, + body: ReplicationMessageBody, + rr_message_id: Option<&str>, +) { + let _ = + send_replication_response_checked(peer, p2p_node, request_id, body, rr_message_id).await; +} + +/// Send a replication response message and report whether it was accepted. +/// +/// Returns `true` after the message is encoded and accepted by the P2P send +/// path. Returns `false` after logging an encode or send failure. Repair-proof +/// recording uses this to avoid trusting hints that were not actually sent. /// /// When `rr_message_id` is `Some`, the response is sent via the `/rr/` /// request-response path so saorsa-core can route it back to the caller's /// `send_request` future. Otherwise it is sent as a plain message. -async fn send_replication_response( +async fn send_replication_response_checked( peer: &PeerId, p2p_node: &Arc, request_id: u64, body: ReplicationMessageBody, rr_message_id: Option<&str>, -) { +) -> bool { let msg = ReplicationMessage { request_id, body }; let encoded = match msg.encode() { Ok(data) => data, Err(e) => { warn!("Failed to encode replication response: {e}"); - return; + return false; } }; let result = if let Some(msg_id) = rr_message_id { @@ -1504,6 +1581,30 @@ async fn send_replication_response( }; if let Err(e) = result { debug!("Failed to send replication response to {peer}: {e}"); + return false; + } + true +} + +async fn record_sent_replica_hints( + peer: &PeerId, + hints: &[neighbor_sync::SentReplicaHint], + repair_proofs: &Arc>, + sync_cycle_epoch: &Arc>, +) { + if hints.is_empty() { + return; + } + + let hinted_at_epoch = *sync_cycle_epoch.read().await; + let mut proofs = repair_proofs.write().await; + for hint in hints { + if proofs.record_replica_hint_sent(*peer, hint.key, &hint.close_peers, hinted_at_epoch) { + debug!( + "Recorded repair hint proof for peer {peer} and key {}", + hex::encode(hint.key) + ); + } } } @@ -1521,6 +1622,8 @@ async fn run_neighbor_sync_round( config: &ReplicationConfig, sync_state: &Arc>, sync_history: &Arc>>, + sync_cycle_epoch: &Arc>, + repair_proofs: &Arc>, is_bootstrapping: &Arc>, bootstrap_state: &Arc>, ) { @@ -1532,29 +1635,37 @@ async fn run_neighbor_sync_round( // prune pass and DHT snapshot so other tasks are not starved. let cycle_complete = sync_state.read().await.is_cycle_complete(); if cycle_complete { + // A completed local neighbor-sync cycle matures key-specific repair + // proofs recorded in earlier epochs. + { + let mut history = sync_history.write().await; + for record in history.values_mut() { + record.cycles_since_sync = record.cycles_since_sync.saturating_add(1); + } + } + let current_sync_epoch = { + let mut epoch = sync_cycle_epoch.write().await; + *epoch = epoch.saturating_add(1); + *epoch + }; + // Post-cycle pruning (Section 11) — runs without holding sync_state. // Remote prune-confirmation audits are storage-proof audits and only // run after bootstrap has drained. let allow_remote_prune_audits = !bootstrapping && bootstrap_state.read().await.is_drained(); - pruning::run_prune_pass( - &self_id, + pruning::run_prune_pass_with_context(pruning::PrunePassContext { + self_id: &self_id, storage, paid_list, p2p_node, config, sync_state, + repair_proofs, + current_sync_epoch, allow_remote_prune_audits, - ) + }) .await; - // Increment `cycles_since_sync` for all peers. - { - let mut history = sync_history.write().await; - for record in history.values_mut() { - record.cycles_since_sync = record.cycles_since_sync.saturating_add(1); - } - } - // Take fresh close-neighbor snapshot (DHT query, no lock held). let neighbors = neighbor_sync::snapshot_close_neighbors(p2p_node, &self_id, config.neighbor_sync_scope) @@ -1596,7 +1707,7 @@ async fn run_neighbor_sync_round( // Sync with each peer in the batch. for peer in &batch { - let response = neighbor_sync::sync_with_peer( + let outcome = neighbor_sync::sync_with_peer_with_outcome( peer, p2p_node, storage, @@ -1606,11 +1717,12 @@ async fn run_neighbor_sync_round( ) .await; - if let Some(resp) = response { + if let Some(outcome) = outcome { handle_sync_response( &self_id, peer, - &resp, + &outcome.response, + &outcome.sent_replica_hints, p2p_node, config, bootstrapping, @@ -1620,6 +1732,8 @@ async fn run_neighbor_sync_round( queues, sync_state, sync_history, + sync_cycle_epoch, + repair_proofs, ) .await; } else { @@ -1631,7 +1745,7 @@ async fn run_neighbor_sync_round( // Attempt sync with the replacement peer (if one was found). if let Some(replacement_peer) = replacement { - let replacement_resp = neighbor_sync::sync_with_peer( + let replacement_outcome = neighbor_sync::sync_with_peer_with_outcome( &replacement_peer, p2p_node, storage, @@ -1641,11 +1755,12 @@ async fn run_neighbor_sync_round( ) .await; - if let Some(resp) = replacement_resp { + if let Some(outcome) = replacement_outcome { handle_sync_response( &self_id, &replacement_peer, - &resp, + &outcome.response, + &outcome.sent_replica_hints, p2p_node, config, bootstrapping, @@ -1655,6 +1770,8 @@ async fn run_neighbor_sync_round( queues, sync_state, sync_history, + sync_cycle_epoch, + repair_proofs, ) .await; } @@ -1670,6 +1787,7 @@ async fn handle_sync_response( self_id: &PeerId, peer: &PeerId, resp: &NeighborSyncResponse, + sent_replica_hints: &[neighbor_sync::SentReplicaHint], p2p_node: &Arc, config: &ReplicationConfig, bootstrapping: bool, @@ -1679,6 +1797,8 @@ async fn handle_sync_response( queues: &Arc>, sync_state: &Arc>, sync_history: &Arc>>, + sync_cycle_epoch: &Arc>, + repair_proofs: &Arc>, ) { // Record successful sync. { @@ -1739,6 +1859,7 @@ async fn handle_sync_response( let mut state = sync_state.write().await; state.clear_active_bootstrap_claim(peer); } + record_sent_replica_hints(peer, sent_replica_hints, repair_proofs, sync_cycle_epoch).await; let outcome = admit_and_queue_hints( self_id, peer, diff --git a/src/replication/neighbor_sync.rs b/src/replication/neighbor_sync.rs index f905161..897d41a 100644 --- a/src/replication/neighbor_sync.rs +++ b/src/replication/neighbor_sync.rs @@ -3,6 +3,7 @@ //! Round-robin cycle management: snapshot close neighbors, iterate through //! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets. +use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -20,6 +21,25 @@ use crate::replication::protocol::{ use crate::replication::types::NeighborSyncState; use crate::storage::LmdbStorage; +/// Replica hint sent to a peer, with the close-group snapshot used to decide +/// that hint. +#[derive(Debug)] +pub(crate) struct SentReplicaHint { + /// Key included in the replica hint set. + pub(crate) key: XorName, + /// Self-inclusive close group observed during hint construction. + pub(crate) close_peers: HashSet, +} + +/// Result of an outbound neighbor-sync exchange. +#[derive(Debug)] +pub(crate) struct NeighborSyncOutcome { + /// The peer's sync response. + pub(crate) response: NeighborSyncResponse, + /// Replica hints sent to the peer in our request. + pub(crate) sent_replica_hints: Vec, +} + /// Build replica hints for a specific peer. /// /// Returns keys that we believe the peer should hold (peer is among the @@ -35,6 +55,19 @@ pub async fn build_replica_hints_for_peer( p2p_node: &Arc, close_group_size: usize, ) -> Vec { + build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size) + .await + .into_iter() + .map(|hint| hint.key) + .collect() +} + +pub(crate) async fn build_replica_hints_for_peer_with_close_groups( + peer: &PeerId, + storage: &Arc, + p2p_node: &Arc, + close_group_size: usize, +) -> Vec { let all_keys = match storage.all_keys().await { Ok(keys) => keys, Err(e) => { @@ -49,8 +82,9 @@ pub async fn build_replica_hints_for_peer( let closest = dht .find_closest_nodes_local_with_self(&key, close_group_size) .await; - if closest.iter().any(|n| n.peer_id == *peer) { - hints.push(key); + let close_peers = closest.iter().map(|n| n.peer_id).collect::>(); + if close_peers.contains(peer) { + hints.push(SentReplicaHint { key, close_peers }); } } hints @@ -148,9 +182,31 @@ pub async fn sync_with_peer( config: &ReplicationConfig, is_bootstrapping: bool, ) -> Option { + sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping) + .await + .map(|outcome| outcome.response) +} + +pub(crate) async fn sync_with_peer_with_outcome( + peer: &PeerId, + p2p_node: &Arc, + storage: &Arc, + paid_list: &Arc, + config: &ReplicationConfig, + is_bootstrapping: bool, +) -> Option { // Build peer-targeted hint sets (Rule 7). - let replica_hints = - build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await; + let sent_replica_hints = build_replica_hints_for_peer_with_close_groups( + peer, + storage, + p2p_node, + config.close_group_size, + ) + .await; + let replica_hints = sent_replica_hints + .iter() + .map(|hint| hint.key) + .collect::>(); let paid_hints = build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size) .await; @@ -193,7 +249,10 @@ pub async fn sync_with_peer( match ReplicationMessage::decode(&response.data) { Ok(decoded) => { if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body { - Some(resp) + Some(NeighborSyncOutcome { + response: resp, + sent_replica_hints, + }) } else { warn!("Unexpected response type from {peer} during sync"); None @@ -261,18 +320,49 @@ pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) { /// indicates whether the caller should process the sender's inbound hints. pub async fn handle_sync_request( sender: &PeerId, - _request: &NeighborSyncRequest, + request: &NeighborSyncRequest, p2p_node: &Arc, storage: &Arc, paid_list: &Arc, config: &ReplicationConfig, is_bootstrapping: bool, ) -> (NeighborSyncResponse, bool) { + let (response, _, sender_in_rt) = handle_sync_request_with_proofs( + sender, + request, + p2p_node, + storage, + paid_list, + config, + is_bootstrapping, + ) + .await; + (response, sender_in_rt) +} + +pub(crate) async fn handle_sync_request_with_proofs( + sender: &PeerId, + _request: &NeighborSyncRequest, + p2p_node: &Arc, + storage: &Arc, + paid_list: &Arc, + config: &ReplicationConfig, + is_bootstrapping: bool, +) -> (NeighborSyncResponse, Vec, bool) { let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await; // Build outbound hints (always sent, even to non-RT peers). - let replica_hints = - build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await; + let sent_replica_hints = build_replica_hints_for_peer_with_close_groups( + sender, + storage, + p2p_node, + config.close_group_size, + ) + .await; + let replica_hints = sent_replica_hints + .iter() + .map(|hint| hint.key) + .collect::>(); let paid_hints = build_paid_hints_for_peer( sender, paid_list, @@ -289,7 +379,7 @@ pub async fn handle_sync_request( }; // Rule 4-6: accept inbound hints only if sender is in LocalRT. - (response, sender_in_rt) + (response, sent_replica_hints, sender_in_rt) } // --------------------------------------------------------------------------- diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 3ef1720..4618ab0 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -26,7 +26,7 @@ use crate::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, ReplicationMessageBody, ABSENT_KEY_DIGEST, }; -use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState}; +use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState, RepairProofs}; use crate::storage::LmdbStorage; use super::REPLICATION_TRUST_WEIGHT; @@ -54,6 +54,28 @@ pub struct PruneResult { pub paid_entries_cleared: usize, } +/// Shared dependencies and switches for one prune pass. +pub struct PrunePassContext<'a> { + /// Local peer id. + pub self_id: &'a PeerId, + /// Local record storage. + pub storage: &'a Arc, + /// Persistent paid-list state. + pub paid_list: &'a Arc, + /// P2P node used for routing lookups and prune-confirmation audits. + pub p2p_node: &'a Arc, + /// Replication configuration. + pub config: &'a ReplicationConfig, + /// Neighbor-sync state, including prune cursor and bootstrap claims. + pub sync_state: &'a Arc>, + /// Key-specific repair proofs used to gate prune-confirmation audits. + pub repair_proofs: &'a Arc>, + /// Current local neighbor-sync cycle epoch. + pub current_sync_epoch: u64, + /// Whether remote prune-confirmation audits are allowed this pass. + pub allow_remote_prune_audits: bool, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum PruneAuditStatus { Proven, @@ -81,6 +103,28 @@ struct RecordPruneCandidate { target_peers: Vec, } +struct RecordPruneKeyOutcome { + marked: bool, + state: RecordPruneKeyState, +} + +impl Default for RecordPruneKeyOutcome { + fn default() -> Self { + Self { + marked: false, + state: RecordPruneKeyState::None, + } + } +} + +enum RecordPruneKeyState { + None, + Cleared, + BootstrapDeferred, + BudgetDeferred, + Candidate(RecordPruneCandidate), +} + #[derive(Default)] struct PruneAuditReportState { audit_failures: RwLock>, @@ -103,6 +147,13 @@ struct PruneAuditReportState { /// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`. /// - If not in group: set timestamp if not already set; remove entry if the /// timestamp is at least `PRUNE_HYSTERESIS_DURATION` old. +/// +/// Compatibility wrapper for callers that have not adopted repair-proof +/// tracking. It preserves the original public signature, but it has no proof +/// table or advanced sync epoch to pass into record prune-confirmation audits. +/// Out-of-range records are therefore marked/deferred rather than deleted via +/// remote confirmation. The replication engine calls +/// [`run_prune_pass_with_context`] so it can pass real repair proofs. pub async fn run_prune_pass( self_id: &PeerId, storage: &Arc, @@ -112,19 +163,27 @@ pub async fn run_prune_pass( sync_state: &Arc>, allow_remote_prune_audits: bool, ) -> PruneResult { - let (stored_count, record_stats) = prune_stored_records( + let repair_proofs = Arc::new(RwLock::new(RepairProofs::new())); + run_prune_pass_with_context(PrunePassContext { self_id, storage, paid_list, p2p_node, config, sync_state, + repair_proofs: &repair_proofs, + current_sync_epoch: 0, allow_remote_prune_audits, - ) - .await; + }) + .await +} + +/// Execute one prune pass with repair-proof-gated remote confirmations. +pub async fn run_prune_pass_with_context(ctx: PrunePassContext<'_>) -> PruneResult { + let (stored_count, record_stats) = prune_stored_records(&ctx).await; let now = Instant::now(); let (paid_count, paid_stats) = - prune_paid_entries(self_id, paid_list, p2p_node, config, now).await; + prune_paid_entries(ctx.self_id, ctx.paid_list, ctx.p2p_node, ctx.config, now).await; let result = PruneResult { records_pruned: record_stats.pruned, @@ -143,16 +202,8 @@ pub async fn run_prune_pass( result } -async fn prune_stored_records( - self_id: &PeerId, - storage: &Arc, - paid_list: &Arc, - p2p_node: &Arc, - config: &ReplicationConfig, - sync_state: &Arc>, - allow_remote_prune_audits: bool, -) -> (usize, RecordPruneStats) { - let stored_keys = match storage.all_keys().await { +async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPruneStats) { + let stored_keys = match ctx.storage.all_keys().await { Ok(keys) => keys, Err(e) => { warn!("Failed to read stored keys for pruning: {e}"); @@ -161,67 +212,44 @@ async fn prune_stored_records( }; let now = Instant::now(); - let dht = p2p_node.dht_manager(); + let dht = ctx.p2p_node.dht_manager(); let mut stats = RecordPruneStats::default(); let mut candidates = Vec::new(); let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS; let mut budget_deferred = 0usize; let mut bootstrap_deferred = 0usize; - let scan_start = prune_scan_start(sync_state, stored_keys.len()).await; + let scan_start = prune_scan_start(ctx.sync_state, stored_keys.len()).await; let mut last_selected_offset = None; for offset in 0..stored_keys.len() { let key = &stored_keys[(scan_start + offset) % stored_keys.len()]; let closest: Vec = dht - .find_closest_nodes_local_with_self(key, config.close_group_size) + .find_closest_nodes_local_with_self(key, ctx.config.close_group_size) .await; - let is_responsible = closest.iter().any(|n| n.peer_id == *self_id); - if is_responsible { - if paid_list.record_out_of_range_since(key).is_some() { - paid_list.clear_record_out_of_range(key); - stats.cleared += 1; + let outcome = + evaluate_record_prune_key(ctx, key, &closest, now, &mut audit_challenge_budget).await; + if outcome.marked { + stats.marked += 1; + } + match outcome.state { + RecordPruneKeyState::None => {} + RecordPruneKeyState::Cleared => stats.cleared += 1, + RecordPruneKeyState::BootstrapDeferred => { + bootstrap_deferred = bootstrap_deferred.saturating_add(1); } - } else { - if paid_list.record_out_of_range_since(key).is_none() { - stats.marked += 1; + RecordPruneKeyState::BudgetDeferred => { + budget_deferred = budget_deferred.saturating_add(1); } - paid_list.set_record_out_of_range(key); - - if let Some(first_seen) = paid_list.record_out_of_range_since(key) { - let elapsed = now - .checked_duration_since(first_seen) - .unwrap_or(Duration::ZERO); - if elapsed >= config.prune_hysteresis_duration { - if !allow_remote_prune_audits { - bootstrap_deferred = bootstrap_deferred.saturating_add(1); - continue; - } - let target_peers = remote_close_group_peers(&closest, self_id); - if target_peers.is_empty() { - warn!( - "Cannot prune {}: current close group has no remote peers", - hex::encode(key) - ); - continue; - } - if target_peers.len() > audit_challenge_budget { - budget_deferred = budget_deferred.saturating_add(1); - continue; - } - audit_challenge_budget -= target_peers.len(); - last_selected_offset = Some(offset); - candidates.push(RecordPruneCandidate { - key: *key, - target_peers, - }); - } + RecordPruneKeyState::Candidate(candidate) => { + last_selected_offset = Some(offset); + candidates.push(candidate); } } } advance_prune_cursor( - sync_state, + ctx.sync_state, stored_keys.len(), scan_start, last_selected_offset, @@ -242,23 +270,112 @@ async fn prune_stored_records( ); } - let present_by_key = - collect_record_prune_proofs(&candidates, storage, p2p_node, config, sync_state).await; + let present_by_key = collect_record_prune_proofs( + &candidates, + ctx.storage, + ctx.p2p_node, + ctx.config, + ctx.sync_state, + ) + .await; let (keys_to_delete, revalidated_cleared) = revalidated_record_prune_keys( &candidates, &present_by_key, - self_id, - paid_list, - p2p_node, - config, + ctx.self_id, + ctx.paid_list, + ctx.p2p_node, + ctx.config, ) .await; stats.cleared += revalidated_cleared; - stats.pruned = delete_stored_records(&keys_to_delete, storage, paid_list).await; + stats.pruned = delete_stored_records( + &keys_to_delete, + ctx.storage, + ctx.paid_list, + ctx.repair_proofs, + ) + .await; (stored_keys.len(), stats) } +async fn evaluate_record_prune_key( + ctx: &PrunePassContext<'_>, + key: &XorName, + closest: &[DHTNode], + now: Instant, + audit_challenge_budget: &mut usize, +) -> RecordPruneKeyOutcome { + let mut outcome = RecordPruneKeyOutcome::default(); + let is_responsible = closest.iter().any(|node| node.peer_id == *ctx.self_id); + + if is_responsible { + if ctx.paid_list.record_out_of_range_since(key).is_some() { + ctx.paid_list.clear_record_out_of_range(key); + outcome.state = RecordPruneKeyState::Cleared; + } + return outcome; + } + + if ctx.paid_list.record_out_of_range_since(key).is_none() { + outcome.marked = true; + } + ctx.paid_list.set_record_out_of_range(key); + + let Some(first_seen) = ctx.paid_list.record_out_of_range_since(key) else { + return outcome; + }; + let elapsed = now + .checked_duration_since(first_seen) + .unwrap_or(Duration::ZERO); + if elapsed < ctx.config.prune_hysteresis_duration { + return outcome; + } + + if !ctx.allow_remote_prune_audits { + outcome.state = RecordPruneKeyState::BootstrapDeferred; + return outcome; + } + + let target_peers = remote_close_group_peers(closest, ctx.self_id); + if target_peers.is_empty() { + warn!( + "Cannot prune {}: current close group has no remote peers", + hex::encode(key) + ); + return outcome; + } + + let current_close_peers: HashSet = closest.iter().map(|node| node.peer_id).collect(); + if !target_peers_have_mature_repair_proofs( + key, + &target_peers, + ¤t_close_peers, + ctx.repair_proofs, + ctx.current_sync_epoch, + ) + .await + { + debug!( + "Deferring prune for {} until current close group has mature repair proofs", + hex::encode(key) + ); + return outcome; + } + + if target_peers.len() > *audit_challenge_budget { + outcome.state = RecordPruneKeyState::BudgetDeferred; + return outcome; + } + + *audit_challenge_budget -= target_peers.len(); + outcome.state = RecordPruneKeyState::Candidate(RecordPruneCandidate { + key: *key, + target_peers, + }); + outcome +} + async fn prune_paid_entries( self_id: &PeerId, paid_list: &Arc, @@ -329,6 +446,19 @@ fn remote_close_group_peers(close_group: &[DHTNode], self_id: &PeerId) -> Vec, + repair_proofs: &Arc>, + current_sync_epoch: u64, +) -> bool { + let mut proofs = repair_proofs.write().await; + target_peers.iter().all(|peer| { + proofs.has_mature_replica_hint(peer, key, current_close_peers, current_sync_epoch) + }) +} + async fn prune_scan_start( sync_state: &Arc>, stored_key_count: usize, @@ -358,6 +488,7 @@ async fn delete_stored_records( keys_to_delete: &[XorName], storage: &Arc, paid_list: &Arc, + repair_proofs: &Arc>, ) -> usize { let mut pruned = 0; @@ -367,6 +498,7 @@ async fn delete_stored_records( } else { pruned += 1; paid_list.clear_record_out_of_range(key); + repair_proofs.write().await.remove_key(key); // Seed the PaidForList out-of-range timer so the second pass can // prune the entry sooner, closing the re-admission window between // the storage delete and the PaidForList prune pass. diff --git a/src/replication/types.rs b/src/replication/types.rs index be842e5..df93f24 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -247,6 +247,143 @@ impl PeerSyncRecord { } } +// --------------------------------------------------------------------------- +// Repair proof tracking +// --------------------------------------------------------------------------- + +/// Evidence that this node has sent a replica repair hint for a key to a peer. +#[derive(Debug, Clone)] +struct RepairProof { + /// Local neighbor-sync cycle epoch when the hint was sent. + hinted_at_epoch: u64, +} + +/// Repair proofs for one key, tied to the close-group snapshot they were +/// recorded against. +#[derive(Debug, Clone)] +struct RepairProofEntry { + /// Self-inclusive close group observed when these proofs were recorded. + close_peers: HashSet, + /// Per-peer proof metadata for peers in `close_peers`. + peer_proofs: HashMap, +} + +impl RepairProofEntry { + fn new(close_peers: HashSet) -> Self { + Self { + close_peers, + peer_proofs: HashMap::new(), + } + } +} + +/// Evidence that this node has sent replica repair hints for local keys. +/// +/// The map is keyed by record key so each key retains only one close-group +/// snapshot and at most that snapshot's peers. This bounds memory by local key +/// count times the replication close-group size rather than by churn history. +#[derive(Debug, Clone, Default)] +pub struct RepairProofs { + /// Key-scoped repair proofs. + proofs_by_key: HashMap, +} + +impl RepairProofs { + /// Create an empty repair-proof table. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Record that `peer` was sent a replica repair hint for `key`. + /// + /// `current_close_peers` must be the current self-inclusive close group for + /// `key`. If that close group differs from the previous proof snapshot, + /// proofs for peers that left the close group are invalidated before + /// recording. Stable peers keep their proofs, while a peer that leaves and + /// later re-enters still needs a fresh hint. + pub fn record_replica_hint_sent( + &mut self, + peer: PeerId, + key: XorName, + current_close_peers: &HashSet, + hinted_at_epoch: u64, + ) -> bool { + self.reconcile_key_close_group(&key, current_close_peers); + + if !current_close_peers.contains(&peer) { + return false; + } + + let entry = self + .proofs_by_key + .entry(key) + .or_insert_with(|| RepairProofEntry::new(current_close_peers.clone())); + + if entry.peer_proofs.contains_key(&peer) { + return false; + } + + entry + .peer_proofs + .insert(peer, RepairProof { hinted_at_epoch }); + true + } + + /// Whether this node has mature repair-hint evidence for `(peer, key)`. + /// + /// The check invalidates proofs for peers that have left the current + /// self-inclusive close group. A proof is mature only after at least one + /// later local sync-cycle epoch. + pub fn has_mature_replica_hint( + &mut self, + peer: &PeerId, + key: &XorName, + current_close_peers: &HashSet, + current_epoch: u64, + ) -> bool { + self.reconcile_key_close_group(key, current_close_peers); + + self.proofs_by_key + .get(key) + .and_then(|entry| entry.peer_proofs.get(peer)) + .is_some_and(|proof| proof.hinted_at_epoch < current_epoch) + } + + /// Remove all repair proofs for a key, e.g. after local deletion. + pub fn remove_key(&mut self, key: &XorName) { + self.proofs_by_key.remove(key); + } + + /// Remove all repair proofs for a peer, e.g. after routing-table removal. + pub fn remove_peer(&mut self, peer: &PeerId) { + self.proofs_by_key.retain(|_, entry| { + entry.peer_proofs.remove(peer); + !entry.peer_proofs.is_empty() + }); + } + + fn reconcile_key_close_group(&mut self, key: &XorName, current_close_peers: &HashSet) { + let should_remove = if let Some(entry) = self.proofs_by_key.get_mut(key) { + if entry.close_peers == *current_close_peers { + return; + } + + entry.close_peers.clone_from(current_close_peers); + entry + .peer_proofs + .retain(|peer, _| current_close_peers.contains(peer)); + entry.peer_proofs.is_empty() + } else { + false + }; + + if should_remove { + self.proofs_by_key.remove(key); + } + } +} + // --------------------------------------------------------------------------- // Neighbor sync cycle state // --------------------------------------------------------------------------- @@ -583,6 +720,199 @@ mod tests { ); } + // -- RepairProofs -------------------------------------------------------- + + #[test] + fn repair_proofs_record_sent_hint_for_close_peer() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA1; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH)); + + assert!( + proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "sent hint should make key auditable for that peer" + ); + } + + #[test] + fn repair_proofs_reject_peer_outside_current_close_group() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA2; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(!proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH)); + + assert!( + !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "peers outside current close group must not get repair proof" + ); + } + + #[test] + fn repair_proofs_require_later_epoch() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA3; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH)); + + assert!( + !proofs.has_mature_replica_hint(&peer, &key, &close_peers, HINT_EPOCH), + "same-cycle proof should not be audit-eligible" + ); + assert!( + proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "proof should mature after a later local sync-cycle epoch" + ); + } + + #[test] + fn repair_proofs_repeated_hint_does_not_reset_maturity() { + const HINT_EPOCH: u64 = 7; + const REPEATED_HINT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA5; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH)); + assert!( + !proofs.record_replica_hint_sent(peer, key, &close_peers, REPEATED_HINT_EPOCH), + "duplicate hint in the same close group should keep existing proof" + ); + assert!( + proofs.has_mature_replica_hint(&peer, &key, &close_peers, REPEATED_HINT_EPOCH), + "duplicate hint must not reset an already mature proof" + ); + } + + #[test] + fn repair_proofs_retain_stable_peers_on_close_group_change() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA7; 32]; + let stable_peer = peer_id_from_byte(1); + let departing_peer = peer_id_from_byte(2); + let retained_peer = peer_id_from_byte(3); + let new_peer = peer_id_from_byte(4); + let old_group = HashSet::from([stable_peer, departing_peer, retained_peer]); + let changed_group = HashSet::from([stable_peer, retained_peer, new_peer]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(stable_peer, key, &old_group, HINT_EPOCH)); + assert!(proofs.record_replica_hint_sent(departing_peer, key, &old_group, HINT_EPOCH)); + + assert!( + proofs.has_mature_replica_hint(&stable_peer, &key, &changed_group, CURRENT_EPOCH), + "stable peers should keep mature repair proofs across unrelated close-group churn" + ); + assert!( + !proofs.has_mature_replica_hint(&departing_peer, &key, &changed_group, CURRENT_EPOCH), + "peers that left the close group should lose repair proofs" + ); + assert!( + !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, CURRENT_EPOCH), + "new close-group peers need their own repair hint before auditing" + ); + } + + #[test] + fn repair_proofs_evicted_peer_reentry_requires_fresh_hint() { + const FIRST_HINT_EPOCH: u64 = 7; + const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1; + const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1; + + let key = [0xA3; 32]; + let returning_peer = peer_id_from_byte(1); + let new_peer = peer_id_from_byte(4); + let old_group = HashSet::from([returning_peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let changed_group = HashSet::from([new_peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(returning_peer, key, &old_group, FIRST_HINT_EPOCH,)); + + assert!( + !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, SECOND_HINT_EPOCH), + "new close-group peer should not inherit another peer's repair proof" + ); + assert!( + !proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH), + "a peer that re-enters must receive a fresh repair hint" + ); + + assert!(proofs.record_replica_hint_sent( + returning_peer, + key, + &old_group, + SECOND_HINT_EPOCH, + )); + assert!( + proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH), + "fresh repair hint after re-entry should be eligible once mature" + ); + } + + #[test] + fn repair_proofs_remove_peer_requires_fresh_hint_after_reentry() { + const FIRST_HINT_EPOCH: u64 = 7; + const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1; + const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1; + + let key = [0xA6; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, FIRST_HINT_EPOCH)); + proofs.remove_peer(&peer); + + assert!( + !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "routing-table removal should clear proof even if peer re-enters same close group" + ); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, SECOND_HINT_EPOCH)); + assert!( + proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "fresh hint after re-entry should become eligible after a later epoch" + ); + } + + #[test] + fn repair_proofs_remove_key_clears_all_peer_entries() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + + let key = [0xA4; 32]; + let peer = peer_id_from_byte(1); + let close_peers = HashSet::from([peer]); + let mut proofs = RepairProofs::new(); + + assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH)); + proofs.remove_key(&key); + + assert!( + !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH), + "deleted local key should not retain repair proof entries" + ); + } + // -- NeighborSyncState ------------------------------------------------- #[test] diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 36fda3c..83fc792 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -15,11 +15,12 @@ use ant_node::replication::protocol::{ }; use ant_node::replication::pruning; use ant_node::replication::scheduling::ReplicationQueues; -use ant_node::replication::types::NeighborSyncState; +use ant_node::replication::types::{NeighborSyncState, RepairProofs}; use ant_node::ReplicationConfig; use saorsa_core::identity::PeerId; use saorsa_core::{P2PNode, TrustEvent}; use serial_test::serial; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; @@ -131,6 +132,31 @@ async fn store_record_on_peers( } } +async fn record_repair_proofs_for_peers( + repair_proofs: &Arc>, + p2p_node: &Arc, + config: &ReplicationConfig, + peers: &[PeerId], + key: &[u8; 32], + hinted_at_epoch: u64, +) { + let close_peers: HashSet = p2p_node + .dht_manager() + .find_closest_nodes_local_with_self(key, config.close_group_size) + .await + .iter() + .map(|node| node.peer_id) + .collect(); + let mut proofs = repair_proofs.write().await; + for peer in peers { + assert!( + proofs.record_replica_hint_sent(*peer, *key, &close_peers, hinted_at_epoch), + "test target should be in close group for repair-proof recording" + ); + } + drop(proofs); +} + /// Fresh write happy path (Section 18 #1). /// /// Store a chunk on a node that has a `ReplicationEngine`, manually call @@ -448,6 +474,9 @@ async fn test_audit_absent_key_returns_sentinel() { #[tokio::test] #[serial] async fn test_prune_pass_requires_remote_confirmation_before_delete() { + const HINT_EPOCH: u64 = 7; + const CURRENT_EPOCH: u64 = HINT_EPOCH + 1; + let harness = TestHarness::setup_minimal().await.expect("setup"); harness.warmup_dht().await.expect("warmup"); @@ -455,6 +484,7 @@ async fn test_prune_pass_requires_remote_confirmation_before_delete() { let close_group_size = 2; let config = prune_test_config(close_group_size); let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(vec![]))); + let repair_proofs = Arc::new(RwLock::new(RepairProofs::new())); let pruner = harness.test_node(pruner_idx).expect("pruner"); let pruner_p2p = Arc::clone(pruner.p2p_node.as_ref().expect("pruner p2p")); @@ -478,32 +508,45 @@ async fn test_prune_pass_requires_remote_confirmation_before_delete() { .await .expect("put gate record on pruner"); store_record_on_peers(&harness, &gate_targets, &gate_address, &gate_content).await; - - let blocked = pruning::run_prune_pass( - &pruner_peer, - &pruner_storage, - &pruner_paid_list, + record_repair_proofs_for_peers( + &repair_proofs, &pruner_p2p, &config, - &sync_state, - false, + &gate_targets, + &gate_address, + HINT_EPOCH, ) .await; + + let blocked = pruning::run_prune_pass_with_context(pruning::PrunePassContext { + self_id: &pruner_peer, + storage: &pruner_storage, + paid_list: &pruner_paid_list, + p2p_node: &pruner_p2p, + config: &config, + sync_state: &sync_state, + repair_proofs: &repair_proofs, + current_sync_epoch: CURRENT_EPOCH, + allow_remote_prune_audits: false, + }) + .await; assert_eq!(blocked.records_pruned, 0); assert!( pruner_storage.exists(&gate_address).expect("exists"), "record must not be pruned before remote audits are allowed" ); - let confirmed = pruning::run_prune_pass( - &pruner_peer, - &pruner_storage, - &pruner_paid_list, - &pruner_p2p, - &config, - &sync_state, - true, - ) + let confirmed = pruning::run_prune_pass_with_context(pruning::PrunePassContext { + self_id: &pruner_peer, + storage: &pruner_storage, + paid_list: &pruner_paid_list, + p2p_node: &pruner_p2p, + config: &config, + sync_state: &sync_state, + repair_proofs: &repair_proofs, + current_sync_epoch: CURRENT_EPOCH, + allow_remote_prune_audits: true, + }) .await; assert_eq!(confirmed.records_pruned, 1); assert!( @@ -526,17 +569,28 @@ async fn test_prune_pass_requires_remote_confirmation_before_delete() { &missing_content, ) .await; - - let incomplete = pruning::run_prune_pass( - &pruner_peer, - &pruner_storage, - &pruner_paid_list, + record_repair_proofs_for_peers( + &repair_proofs, &pruner_p2p, &config, - &sync_state, - true, + &missing_targets, + &missing_address, + HINT_EPOCH, ) .await; + + let incomplete = pruning::run_prune_pass_with_context(pruning::PrunePassContext { + self_id: &pruner_peer, + storage: &pruner_storage, + paid_list: &pruner_paid_list, + p2p_node: &pruner_p2p, + config: &config, + sync_state: &sync_state, + repair_proofs: &repair_proofs, + current_sync_epoch: CURRENT_EPOCH, + allow_remote_prune_audits: true, + }) + .await; assert_eq!(incomplete.records_pruned, 0); assert!( pruner_storage.exists(&missing_address).expect("exists"), @@ -551,15 +605,17 @@ async fn test_prune_pass_requires_remote_confirmation_before_delete() { ) .await; - let complete = pruning::run_prune_pass( - &pruner_peer, - &pruner_storage, - &pruner_paid_list, - &pruner_p2p, - &config, - &sync_state, - true, - ) + let complete = pruning::run_prune_pass_with_context(pruning::PrunePassContext { + self_id: &pruner_peer, + storage: &pruner_storage, + paid_list: &pruner_paid_list, + p2p_node: &pruner_p2p, + config: &config, + sync_state: &sync_state, + repair_proofs: &repair_proofs, + current_sync_epoch: CURRENT_EPOCH, + allow_remote_prune_audits: true, + }) .await; assert_eq!(complete.records_pruned, 1); assert!(