From 8cb2c9ffaefbfff655440f03536de468ab251f15 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 12 Jun 2026 16:24:14 +0200 Subject: [PATCH] fix(replication): widen local storage admission range --- src/replication/admission.rs | 44 ++++++++++++++--------- src/replication/config.rs | 26 ++++++++++++++ src/replication/mod.rs | 51 +++++++++++++++++--------- src/replication/pruning.rs | 69 +++++++++++++++++++++++++----------- src/storage/handler.rs | 20 ++++++----- tests/e2e/replication.rs | 20 +++++++++-- 6 files changed, 166 insertions(+), 64 deletions(-) diff --git a/src/replication/admission.rs b/src/replication/admission.rs index b996eaa0..3b99a802 100644 --- a/src/replication/admission.rs +++ b/src/replication/admission.rs @@ -15,7 +15,7 @@ use saorsa_core::identity::PeerId; use saorsa_core::P2PNode; use crate::ant_protocol::XorName; -use crate::replication::config::ReplicationConfig; +use crate::replication::config::{storage_admission_width, ReplicationConfig}; use crate::replication::paid_list::PaidList; use crate::storage::LmdbStorage; @@ -31,19 +31,20 @@ pub struct AdmissionResult { pub rejected_keys: Vec, } -/// Check if this node is responsible for key `K`. +/// Check if this node is within a caller-supplied closest-peer width for key +/// `K`. /// -/// Returns `true` if `self_id` is among the `close_group_size` nearest peers -/// to `K` in `SelfInclusiveRT`. +/// Returns `true` if `self_id` is among the `responsibility_width` nearest +/// peers to `K` in `SelfInclusiveRT`. pub async fn is_responsible( self_id: &PeerId, key: &XorName, p2p_node: &Arc, - close_group_size: usize, + responsibility_width: usize, ) -> bool { let closest = p2p_node .dht_manager() - .find_closest_nodes_local_with_self(key, close_group_size) + .find_closest_nodes_local_with_self(key, responsibility_width) .await; closest.iter().any(|n| n.peer_id == *self_id) } @@ -70,8 +71,9 @@ pub async fn is_in_paid_close_group( /// For each key in `replica_hints` and `paid_hints`: /// - **Cross-set precedence**: if a key appears in both sets, keep only the /// replica-hint entry. -/// - **Replica hints**: admitted if `IsResponsible(self, K)` or key already -/// exists in local store / pending set. +/// - **Replica hints**: admitted if `self` is in the storage-admission group +/// (`close_group_size + STORAGE_ADMISSION_MARGIN`) or key already exists in +/// local store / pending set. /// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is /// already in `PaidForList`. /// @@ -111,7 +113,14 @@ pub async fn admit_hints( continue; } - if is_responsible(self_id, &key, p2p_node, config.close_group_size).await { + if is_responsible( + self_id, + &key, + p2p_node, + storage_admission_width(config.close_group_size), + ) + .await + { result.replica_keys.push(key); } else { result.rejected_keys.push(key); @@ -323,8 +332,9 @@ mod tests { /// gate tested at the e2e level (scenario 17 tests the positive /// case). /// (b) Even if a sender IS in `LocalRT`, the per-key relevance check - /// (`is_responsible` / `is_in_paid_close_group`) in `admit_hints` - /// still applies. Sender identity does not grant key admission. + /// (`is_responsible` with storage-admission width / + /// `is_in_paid_close_group`) in `admit_hints` still applies. Sender + /// identity does not grant key admission. /// /// This test exercises layer (b): the admission pipeline's dedup, /// cross-set precedence, and relevance filtering using the same logic @@ -358,8 +368,8 @@ mod tests { admitted_replica.push(key); continue; } - // key_not_pending: not pending, not local -> needs is_responsible. - // Simulate is_responsible returning false (out of range). + // key_not_pending: not pending, not local -> needs the + // storage-admission check. Simulate it returning false. let is_responsible = false; if is_responsible { admitted_replica.push(key); @@ -416,7 +426,7 @@ mod tests { /// Scenario 7: Out-of-range key hint rejected regardless of quorum. /// /// A key whose XOR distance from self is much larger than the distance - /// of the close-group members fails the `is_responsible` check in + /// of the storage-admission members fails the `is_responsible` check in /// `admit_hints`. The key never enters the verification pipeline, so /// quorum is irrelevant. /// @@ -441,7 +451,7 @@ mod tests { // -- Simulate admit_hints for these keys -- // - // When `close_group_size` peers are all closer to far_key than + // When the storage-admission peers are all closer to far_key than // self, `is_responsible(self, far_key)` returns false. The key is // rejected without entering verification or quorum. @@ -460,8 +470,8 @@ mod tests { admitted.push(key); continue; } - // Simulate is_responsible: self (0x00) has close_group_size - // peers closer to far_key (0xFF) than itself -> not responsible. + // Simulate is_responsible: self (0x00) has the full + // storage-admission group closer to far_key (0xFF) than itself. // For close_key (0x01), self is very close -> responsible. let distance = xor_distance(&self_xor, &key); let simulated_responsible = distance[0] < 0x80; diff --git a/src/replication/config.rs b/src/replication/config.rs index 3337cf23..1f5b00a1 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -23,6 +23,14 @@ use crate::ant_protocol::CLOSE_GROUP_SIZE; /// Maximum number of peers per k-bucket in the Kademlia routing table. pub const K_BUCKET_SIZE: usize = 20; +/// Extra local-routing-table positions accepted for local chunk storage +/// admission and stored-record pruning. +/// +/// This margin absorbs small local RT disagreement between peers. It does not +/// widen audit, quorum, or paid-list target sets; those remain strict +/// `close_group_size` / paid-list group checks. +pub const STORAGE_ADMISSION_MARGIN: usize = 2; + /// Full-network target for required positive presence votes. /// /// Effective per-key threshold is @@ -39,6 +47,13 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20; /// round. pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; +/// Width used when deciding whether this node may locally store or retain a +/// chunk. +#[must_use] +pub const fn storage_admission_width(close_group_size: usize) -> usize { + close_group_size.saturating_add(STORAGE_ADMISSION_MARGIN) +} + /// Minimum neighbor-sync cadence. Actual interval is randomized within /// `[min, max]`. const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60; @@ -411,6 +426,17 @@ mod tests { ); } + #[test] + fn storage_admission_width_adds_margin() { + const TEST_CLOSE_GROUP_SIZE: usize = 7; + + assert_eq!( + storage_admission_width(TEST_CLOSE_GROUP_SIZE), + TEST_CLOSE_GROUP_SIZE + STORAGE_ADMISSION_MARGIN + ); + assert_eq!(storage_admission_width(usize::MAX), usize::MAX); + } + #[test] fn audit_failure_weight_is_five() { assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON); diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 083a87ec..b8ef7224 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -47,8 +47,8 @@ use crate::error::{Error, Result}; use crate::payment::{PaymentVerifier, VerificationContext}; use crate::replication::audit::AuditTickResult; use crate::replication::config::{ - max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS, - REPLICATION_PROTOCOL_ID, + max_parallel_fetch, storage_admission_width, ReplicationConfig, + MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID, }; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ @@ -1176,15 +1176,23 @@ async fn handle_fresh_offer( return Ok(()); } - // Rule 7: check responsibility. - if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await { + // Rule 7: check storage admission. Fresh chunk receivers accept the close + // group plus a small margin to absorb local routing-table disagreement. + if !admission::is_responsible( + &self_id, + &offer.key, + p2p_node, + storage_admission_width(config.close_group_size), + ) + .await + { send_replication_response( source, p2p_node, request_id, ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { key: offer.key, - reason: "Not responsible for this key".to_string(), + reason: "Not in storage-admission range for this key".to_string(), }), rr_message_id, ) @@ -1220,7 +1228,7 @@ async fn handle_fresh_offer( // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still // part of the immediate write fan-out: this receiver is about to store the - // record as if the client had PUT it here directly. Receiver responsibility + // record as if the client had PUT it here directly. Storage admission // was checked above before proof work. ClientPut verification applies // store-strength cache semantics, paid-quote issuer close-group and local // price floor checks for single-node proofs, and merkle candidate @@ -2141,9 +2149,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { match pipeline { HintPipeline::PaidOnly => { // Paid-only + local paid state needs one more - // responsibility check outside this lock: if we - // are also in the storage close group, the hint - // can repair a missing replica. + // storage-admission check outside this lock: if we + // are also in the close group plus storage margin, + // the hint can repair a missing replica. local_paid_paid_only_keys.push(*key); } HintPipeline::Replica => { @@ -2166,8 +2174,13 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { for key in local_paid_paid_only_keys { if storage.exists(&key).unwrap_or(false) { terminal_paid_only.push(key); - } else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size) - .await + } else if admission::is_responsible( + &self_id, + &key, + p2p_node, + storage_admission_width(config.close_group_size), + ) + .await { local_paid_presence_probe_keys.push(key); } else { @@ -2276,9 +2289,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { } // Paid-only hints normally update PaidForList only. If this node is - // also storage-responsible for the key, a verified paid-only hint can - // safely repair a missing replica using sources from the same - // verification round. + // also within the storage-admission group for the key, a verified + // paid-only hint can safely repair a missing replica using sources + // from the same verification round. let mut paid_only_fetch_keys: HashSet = HashSet::new(); for (key, outcome, pipeline) in &evaluated { if *pipeline == HintPipeline::PaidOnly @@ -2288,7 +2301,13 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { | KeyVerificationOutcome::PaidListVerified { .. } ) && !storage.exists(key).unwrap_or(false) - && admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await + && admission::is_responsible( + &self_id, + key, + p2p_node, + storage_admission_width(config.close_group_size), + ) + .await { paid_only_fetch_keys.insert(*key); } @@ -2313,7 +2332,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // retained as pending until queue drains. } else if fetch_eligible && sources.is_empty() { warn!( - "Verified responsible key {} has no holders (possible data loss)", + "Verified storage-admitted key {} has no holders (possible data loss)", hex::encode(key) ); q.remove_pending(&key); diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 7b8c7180..68ebb8cc 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -18,8 +18,8 @@ use tokio::sync::RwLock; use crate::ant_protocol::XorName; use crate::replication::config::{ - ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, - REPLICATION_PROTOCOL_ID, + storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, + MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID, }; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ @@ -195,10 +195,12 @@ struct PruneAuditReportState { /// Execute post-cycle responsibility pruning. /// /// For each stored record K: -/// - If `IsResponsible(self, K)`: clear `RecordOutOfRangeFirstSeen`. -/// - If not responsible: set timestamp if not already set; delete if the +/// - If `self` is within the storage-admission group +/// (`close_group_size + STORAGE_ADMISSION_MARGIN`): clear +/// `RecordOutOfRangeFirstSeen`. +/// - If not in that group: set timestamp if not already set; delete if the /// timestamp is at least `PRUNE_HYSTERESIS_DURATION` old and all but one -/// of the current close group prove they store the record. +/// of the strict current close group prove they store the record. /// /// For each `PaidForList` entry K: /// - If self is in `PaidCloseGroup(K)`: clear `PaidOutOfRangeFirstSeen`. @@ -280,7 +282,6 @@ async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPrune }; let now = Instant::now(); - let dht = ctx.p2p_node.dht_manager(); let mut stats = RecordPruneStats::default(); let mut candidates = Vec::new(); let mut audit_challenge_budget = MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS; @@ -291,12 +292,18 @@ async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPrune for offset in 0..stored_keys.len() { let key = &stored_keys[(scan_start + offset) % stored_keys.len()]; - let closest: Vec = dht - .find_closest_nodes_local_with_self(key, ctx.config.close_group_size) - .await; + let (storage_admission_group, strict_close_group) = + record_prune_lookup_groups(key, ctx.p2p_node, ctx.config).await; - let outcome = - evaluate_record_prune_key(ctx, key, &closest, now, &mut audit_challenge_budget).await; + let outcome = evaluate_record_prune_key( + ctx, + key, + &storage_admission_group, + &strict_close_group, + now, + &mut audit_challenge_budget, + ) + .await; if outcome.marked { stats.marked += 1; } @@ -367,15 +374,33 @@ async fn prune_stored_records(ctx: &PrunePassContext<'_>) -> (usize, RecordPrune (stored_keys.len(), stats) } +async fn record_prune_lookup_groups( + key: &XorName, + p2p_node: &Arc, + config: &ReplicationConfig, +) -> (Vec, Vec) { + let dht = p2p_node.dht_manager(); + let storage_admission_group = dht + .find_closest_nodes_local_with_self(key, storage_admission_width(config.close_group_size)) + .await; + let strict_close_group = dht + .find_closest_nodes_local_with_self(key, config.close_group_size) + .await; + (storage_admission_group, strict_close_group) +} + async fn evaluate_record_prune_key( ctx: &PrunePassContext<'_>, key: &XorName, - closest: &[DHTNode], + storage_admission_group: &[DHTNode], + strict_close_group: &[DHTNode], now: Instant, audit_challenge_budget: &mut usize, ) -> RecordPruneKeyOutcome { let mut outcome = RecordPruneKeyOutcome::default(); - let is_responsible = closest.iter().any(|node| node.peer_id == *ctx.self_id); + let is_responsible = storage_admission_group + .iter() + .any(|node| node.peer_id == *ctx.self_id); if is_responsible { if ctx.paid_list.record_out_of_range_since(key).is_some() { @@ -405,7 +430,7 @@ async fn evaluate_record_prune_key( return outcome; } - let target_peers = remote_close_group_peers(closest, ctx.self_id); + let target_peers = remote_close_group_peers(strict_close_group, ctx.self_id); if target_peers.is_empty() { warn!( "Cannot prune {}: current close group has no remote peers", @@ -417,7 +442,8 @@ async fn evaluate_record_prune_key( // Only peers we have hinted (mature repair proof) may be audited; the // proof threshold must be reachable among them. A never-synced peer in // the close group reduces the audit pool instead of vetoing the prune. - let current_close_peers: HashSet = closest.iter().map(|node| node.peer_id).collect(); + let current_close_peers: HashSet = + strict_close_group.iter().map(|node| node.peer_id).collect(); #[cfg(any(test, feature = "test-utils"))] let repair_proof_now = ctx.repair_proof_now.unwrap_or(now); #[cfg(not(any(test, feature = "test-utils")))] @@ -918,17 +944,18 @@ async fn revalidated_record_prune_keys( p2p_node: &Arc, config: &ReplicationConfig, ) -> (Vec, usize) { - let dht = p2p_node.dht_manager(); let mut keys_to_delete = Vec::new(); let mut cleared = 0; let now = Instant::now(); for candidate in candidates { - let closest: Vec = dht - .find_closest_nodes_local_with_self(&candidate.key, config.close_group_size) - .await; + let (storage_admission_group, strict_close_group) = + record_prune_lookup_groups(&candidate.key, p2p_node, config).await; - if closest.iter().any(|n| n.peer_id == *self_id) { + if storage_admission_group + .iter() + .any(|n| n.peer_id == *self_id) + { if paid_list .record_out_of_range_since(&candidate.key) .is_some() @@ -949,7 +976,7 @@ async fn revalidated_record_prune_keys( continue; } - let current_target_peers = remote_close_group_peers(&closest, self_id); + let current_target_peers = remote_close_group_peers(&strict_close_group, self_id); if current_target_peers.is_empty() { warn!( "Cannot prune {}: current close group has no remote peers", diff --git a/src/storage/handler.rs b/src/storage/handler.rs index d5421d09..d51d717a 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -38,6 +38,7 @@ use crate::client::compute_address; use crate::error::{Error, Result}; use crate::logging::{debug, info, warn}; use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; +use crate::replication::config::storage_admission_width; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; @@ -58,7 +59,7 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, - /// P2P node handle used for direct PUT receiver responsibility checks. + /// P2P node handle used for direct PUT storage-admission checks. p2p_node: RwLock>>, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, @@ -270,9 +271,10 @@ impl AntProtocol { Ok(false) => {} } - // 4. Check storage responsibility before payment verification. A node + // 4. Check storage admission before payment verification. A node // should only accept the actual chunk when its local routing table - // places it in the configured close group for the chunk address. + // places it in the configured close group plus storage margin for + // the chunk address. if let Err(e) = self.validate_store_membership(&address).await { return ChunkPutResponse::Error(e); } @@ -375,8 +377,9 @@ impl AntProtocol { return Ok(()); } return Err(ProtocolError::PaymentFailed(format!( - "ClientPut receiver is not among this node's local {} closest peers for {}", - self.payment_verifier.close_group_size(), + "ClientPut receiver is not among this node's local {} closest peers for {} \ + (close group plus storage margin)", + storage_admission_width(self.payment_verifier.close_group_size()), hex::encode(address) ))); } @@ -409,17 +412,18 @@ impl AntProtocol { }; let self_id = *p2p_node.peer_id(); - let close_group_size = self.payment_verifier.close_group_size(); + let admission_width = storage_admission_width(self.payment_verifier.close_group_size()); let closest = p2p_node .dht_manager() - .find_closest_nodes_local_with_self(address, close_group_size) + .find_closest_nodes_local_with_self(address, admission_width) .await; if closest.iter().any(|node| node.peer_id == self_id) { return Ok(()); } Err(ProtocolError::PaymentFailed(format!( - "ClientPut receiver {} is not among this node's local {close_group_size} closest peers for {}", + "ClientPut receiver {} is not among this node's local {admission_width} closest peers for {} \ + (close group plus storage margin)", self_id.to_hex(), hex::encode(address) ))) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 4d0dcaeb..cf5e9e09 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -7,7 +7,9 @@ use super::TestHarness; use ant_node::client::compute_address; -use ant_node::replication::config::{REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID}; +use ant_node::replication::config::{ + storage_admission_width, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, +}; use ant_node::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse, FreshReplicationOffer, FreshReplicationResponse, NeighborSyncRequest, ReplicationMessage, @@ -78,6 +80,7 @@ async fn find_remote_prune_candidate( let pruner = harness.test_node(pruner_idx).expect("pruner"); let pruner_p2p = pruner.p2p_node.as_ref().expect("pruner p2p"); let pruner_peer = *pruner_p2p.peer_id(); + let admission_width = storage_admission_width(close_group_size); for attempt in 0..10_000usize { let content = format!("prune-confirmation-{label}-{attempt}").into_bytes(); @@ -94,6 +97,16 @@ async fn find_remote_prune_candidate( if target_peers.contains(&pruner_peer) { continue; } + let storage_admission_group = pruner_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, admission_width) + .await; + if storage_admission_group + .iter() + .any(|node| node.peer_id == pruner_peer) + { + continue; + } if target_peers .iter() .all(|peer| node_index_for_peer(harness, peer).is_some()) @@ -102,7 +115,10 @@ async fn find_remote_prune_candidate( } } - panic!("failed to find a {close_group_size}-peer prune candidate outside pruner {pruner_idx}"); + panic!( + "failed to find a {close_group_size}-peer prune candidate outside pruner {pruner_idx}'s \ + storage-admission range" + ); } async fn store_record_on_peer(