Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions src/replication/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;

use crate::ant_protocol::XorName;
use crate::replication::config::ReplicationConfig;
use crate::replication::config::{storage_admission_width, ReplicationConfig};
use crate::replication::paid_list::PaidList;
use crate::storage::LmdbStorage;

Expand All @@ -31,19 +31,20 @@ pub struct AdmissionResult {
pub rejected_keys: Vec<XorName>,
}

/// Check if this node is responsible for key `K`.
/// Check if this node is within a caller-supplied closest-peer width for key
/// `K`.
///
/// Returns `true` if `self_id` is among the `close_group_size` nearest peers
/// to `K` in `SelfInclusiveRT`.
/// Returns `true` if `self_id` is among the `responsibility_width` nearest
/// peers to `K` in `SelfInclusiveRT`.
pub async fn is_responsible(
self_id: &PeerId,
key: &XorName,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
responsibility_width: usize,
) -> bool {
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local_with_self(key, close_group_size)
.find_closest_nodes_local_with_self(key, responsibility_width)
.await;
closest.iter().any(|n| n.peer_id == *self_id)
}
Expand All @@ -70,8 +71,9 @@ pub async fn is_in_paid_close_group(
/// For each key in `replica_hints` and `paid_hints`:
/// - **Cross-set precedence**: if a key appears in both sets, keep only the
/// replica-hint entry.
/// - **Replica hints**: admitted if `IsResponsible(self, K)` or key already
/// exists in local store / pending set.
/// - **Replica hints**: admitted if `self` is in the storage-admission group
/// (`close_group_size + STORAGE_ADMISSION_MARGIN`) or key already exists in
/// local store / pending set.
/// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is
/// already in `PaidForList`.
///
Expand Down Expand Up @@ -111,7 +113,14 @@ pub async fn admit_hints(
continue;
}

if is_responsible(self_id, &key, p2p_node, config.close_group_size).await {
if is_responsible(
self_id,
&key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
result.replica_keys.push(key);
} else {
result.rejected_keys.push(key);
Expand Down Expand Up @@ -323,8 +332,9 @@ mod tests {
/// gate tested at the e2e level (scenario 17 tests the positive
/// case).
/// (b) Even if a sender IS in `LocalRT`, the per-key relevance check
/// (`is_responsible` / `is_in_paid_close_group`) in `admit_hints`
/// still applies. Sender identity does not grant key admission.
/// (`is_responsible` with storage-admission width /
/// `is_in_paid_close_group`) in `admit_hints` still applies. Sender
/// identity does not grant key admission.
///
/// This test exercises layer (b): the admission pipeline's dedup,
/// cross-set precedence, and relevance filtering using the same logic
Expand Down Expand Up @@ -358,8 +368,8 @@ mod tests {
admitted_replica.push(key);
continue;
}
// key_not_pending: not pending, not local -> needs is_responsible.
// Simulate is_responsible returning false (out of range).
// key_not_pending: not pending, not local -> needs the
// storage-admission check. Simulate it returning false.
let is_responsible = false;
if is_responsible {
admitted_replica.push(key);
Expand Down Expand Up @@ -416,7 +426,7 @@ mod tests {
/// Scenario 7: Out-of-range key hint rejected regardless of quorum.
///
/// A key whose XOR distance from self is much larger than the distance
/// of the close-group members fails the `is_responsible` check in
/// of the storage-admission members fails the `is_responsible` check in
/// `admit_hints`. The key never enters the verification pipeline, so
/// quorum is irrelevant.
///
Expand All @@ -441,7 +451,7 @@ mod tests {

// -- Simulate admit_hints for these keys --
//
// When `close_group_size` peers are all closer to far_key than
// When the storage-admission peers are all closer to far_key than
// self, `is_responsible(self, far_key)` returns false. The key is
// rejected without entering verification or quorum.

Expand All @@ -460,8 +470,8 @@ mod tests {
admitted.push(key);
continue;
}
// Simulate is_responsible: self (0x00) has close_group_size
// peers closer to far_key (0xFF) than itself -> not responsible.
// Simulate is_responsible: self (0x00) has the full
// storage-admission group closer to far_key (0xFF) than itself.
// For close_key (0x01), self is very close -> responsible.
let distance = xor_distance(&self_xor, &key);
let simulated_responsible = distance[0] < 0x80;
Expand Down
26 changes: 26 additions & 0 deletions src/replication/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ use crate::ant_protocol::CLOSE_GROUP_SIZE;
/// Maximum number of peers per k-bucket in the Kademlia routing table.
pub const K_BUCKET_SIZE: usize = 20;

/// Extra local-routing-table positions accepted for local chunk storage
/// admission and stored-record pruning.
///
/// This margin absorbs small local RT disagreement between peers. It does not
/// widen audit, quorum, or paid-list target sets; those remain strict
/// `close_group_size` / paid-list group checks.
pub const STORAGE_ADMISSION_MARGIN: usize = 2;

/// Full-network target for required positive presence votes.
///
/// Effective per-key threshold is
Expand All @@ -39,6 +47,13 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20;
/// round.
pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4;

/// Width used when deciding whether this node may locally store or retain a
/// chunk.
#[must_use]
pub const fn storage_admission_width(close_group_size: usize) -> usize {
close_group_size.saturating_add(STORAGE_ADMISSION_MARGIN)
}

/// Minimum neighbor-sync cadence. Actual interval is randomized within
/// `[min, max]`.
const NEIGHBOR_SYNC_INTERVAL_MIN_SECS: u64 = 10 * 60;
Expand Down Expand Up @@ -411,6 +426,17 @@ mod tests {
);
}

#[test]
fn storage_admission_width_adds_margin() {
const TEST_CLOSE_GROUP_SIZE: usize = 7;

assert_eq!(
storage_admission_width(TEST_CLOSE_GROUP_SIZE),
TEST_CLOSE_GROUP_SIZE + STORAGE_ADMISSION_MARGIN
);
assert_eq!(storage_admission_width(usize::MAX), usize::MAX);
}

#[test]
fn audit_failure_weight_is_five() {
assert!((AUDIT_FAILURE_TRUST_WEIGHT - 5.0).abs() <= f64::EPSILON);
Expand Down
51 changes: 35 additions & 16 deletions src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use crate::error::{Error, Result};
use crate::payment::{PaymentVerifier, VerificationContext};
use crate::replication::audit::AuditTickResult;
use crate::replication::config::{
max_parallel_fetch, ReplicationConfig, MAX_CONCURRENT_REPLICATION_SENDS,
REPLICATION_PROTOCOL_ID,
max_parallel_fetch, storage_admission_width, ReplicationConfig,
MAX_CONCURRENT_REPLICATION_SENDS, REPLICATION_PROTOCOL_ID,
};
use crate::replication::paid_list::PaidList;
use crate::replication::protocol::{
Expand Down Expand Up @@ -1176,15 +1176,23 @@ async fn handle_fresh_offer(
return Ok(());
}

// Rule 7: check responsibility.
if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await {
// Rule 7: check storage admission. Fresh chunk receivers accept the close
// group plus a small margin to absorb local routing-table disagreement.
if !admission::is_responsible(
&self_id,
&offer.key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
send_replication_response(
source,
p2p_node,
request_id,
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
key: offer.key,
reason: "Not responsible for this key".to_string(),
reason: "Not in storage-admission range for this key".to_string(),
}),
rr_message_id,
)
Expand Down Expand Up @@ -1220,7 +1228,7 @@ async fn handle_fresh_offer(

// Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still
// part of the immediate write fan-out: this receiver is about to store the
// record as if the client had PUT it here directly. Receiver responsibility
// record as if the client had PUT it here directly. Storage admission
// was checked above before proof work. ClientPut verification applies
// store-strength cache semantics, paid-quote issuer close-group and local
// price floor checks for single-node proofs, and merkle candidate
Expand Down Expand Up @@ -2141,9 +2149,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
match pipeline {
HintPipeline::PaidOnly => {
// Paid-only + local paid state needs one more
// responsibility check outside this lock: if we
// are also in the storage close group, the hint
// can repair a missing replica.
// storage-admission check outside this lock: if we
// are also in the close group plus storage margin,
// the hint can repair a missing replica.
local_paid_paid_only_keys.push(*key);
}
HintPipeline::Replica => {
Expand All @@ -2166,8 +2174,13 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
for key in local_paid_paid_only_keys {
if storage.exists(&key).unwrap_or(false) {
terminal_paid_only.push(key);
} else if admission::is_responsible(&self_id, &key, p2p_node, config.close_group_size)
.await
} else if admission::is_responsible(
&self_id,
&key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
local_paid_presence_probe_keys.push(key);
} else {
Expand Down Expand Up @@ -2276,9 +2289,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
}

// Paid-only hints normally update PaidForList only. If this node is
// also storage-responsible for the key, a verified paid-only hint can
// safely repair a missing replica using sources from the same
// verification round.
// also within the storage-admission group for the key, a verified
// paid-only hint can safely repair a missing replica using sources
// from the same verification round.
let mut paid_only_fetch_keys: HashSet<XorName> = HashSet::new();
for (key, outcome, pipeline) in &evaluated {
if *pipeline == HintPipeline::PaidOnly
Expand All @@ -2288,7 +2301,13 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
| KeyVerificationOutcome::PaidListVerified { .. }
)
&& !storage.exists(key).unwrap_or(false)
&& admission::is_responsible(&self_id, key, p2p_node, config.close_group_size).await
&& admission::is_responsible(
&self_id,
key,
p2p_node,
storage_admission_width(config.close_group_size),
)
.await
{
paid_only_fetch_keys.insert(*key);
}
Expand All @@ -2313,7 +2332,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) {
// retained as pending until queue drains.
} else if fetch_eligible && sources.is_empty() {
warn!(
"Verified responsible key {} has no holders (possible data loss)",
"Verified storage-admitted key {} has no holders (possible data loss)",
hex::encode(key)
);
q.remove_pending(&key);
Expand Down
Loading
Loading