diff --git a/src/devnet.rs b/src/devnet.rs index 3dede92c..4662e286 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -10,6 +10,7 @@ use crate::payment::{ EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator, QuotingMetricsTracker, }; +use crate::replication::config::ReplicationConfig; use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig}; use evmlib::Network as EvmNetwork; use evmlib::RewardsAddress; @@ -550,9 +551,11 @@ impl Devnet { }; let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS); + let replication_config = ReplicationConfig::default(); let payment_config = PaymentVerifierConfig { evm: evm_config, cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY, + close_group_size: replication_config.close_group_size, local_rewards_address: rewards_address, }; let payment_verifier = PaymentVerifier::new(payment_config); @@ -611,10 +614,9 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { - // Wire the P2PNode into the payment verifier for merkle-closeness checks. - protocol - .payment_verifier_arc() - .attach_p2p_node(Arc::clone(p2p)); + // Wire P2P into AntProtocol for direct PUT responsibility and + // payment-proof closeness checks. + protocol.attach_p2p_node(Arc::clone(p2p)); let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); diff --git a/src/node.rs b/src/node.rs index 0926df24..151a3696 100644 --- a/src/node.rs +++ b/src/node.rs @@ -107,11 +107,15 @@ impl NodeBuilder { Some(Self::build_upgrade_monitor(&self.config, node_id_seed)) }; + let repl_config = ReplicationConfig::default(); + // Initialize ANT protocol handler for chunk storage and // wire the fresh-write channel so PUTs trigger replication. let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled { let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?; + let mut protocol = + Self::build_ant_protocol(&self.config, &identity, repl_config.close_group_size) + .await?; protocol.set_fresh_write_sender(fresh_write_tx); (Some(Arc::new(protocol)), Some(fresh_write_rx)) } else { @@ -121,19 +125,16 @@ impl NodeBuilder { let p2p_arc = Arc::new(p2p_node); - // Wire the P2PNode handle into the payment verifier so merkle-payment - // checks can query the live DHT for peers actually closest to a pool - // midpoint (pay-yourself defence). + // Wire the P2PNode handle into AntProtocol so direct PUTs can verify + // close-group responsibility and payment proofs can query live-DHT + // closeness. if let Some(ref protocol) = ant_protocol { - protocol - .payment_verifier_arc() - .attach_p2p_node(Arc::clone(&p2p_arc)); + protocol.attach_p2p_node(Arc::clone(&p2p_arc)); } // Initialize replication engine (if storage is enabled) let replication_engine = if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) { - let repl_config = ReplicationConfig::default(); let storage_arc = protocol.storage(); let payment_verifier_arc = protocol.payment_verifier_arc(); match ReplicationEngine::new( @@ -349,6 +350,7 @@ impl NodeBuilder { async fn build_ant_protocol( config: &NodeConfig, identity: &NodeIdentity, + close_group_size: usize, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { @@ -378,6 +380,7 @@ impl NodeBuilder { network: evm_network, }, cache_capacity: config.payment.cache_capacity, + close_group_size, local_rewards_address: rewards_address, }; let payment_verifier = PaymentVerifier::new(payment_config); diff --git a/src/payment/cache.rs b/src/payment/cache.rs index 75994d91..174c45b8 100644 --- a/src/payment/cache.rs +++ b/src/payment/cache.rs @@ -19,23 +19,33 @@ const DEFAULT_CACHE_CAPACITY: usize = 100_000; /// This cache stores `XorName` values that have been verified to exist on the /// autonomi network, avoiding repeated network queries for the same data. /// -/// Each entry carries a flag recording whether the verification that inserted -/// it ran the full client-PUT check set (`true`) or only the -/// receipt-authenticity subset used for replication (`false`). A -/// replication-verified entry must not satisfy a later client-PUT fast-path — -/// the context-gated checks (own-quote freshness, local recipient, merkle -/// candidate closeness) were never run for it — while either kind of entry -/// satisfies a later replication check. +/// Each entry records which fresh proof verification level inserted it. A +/// paid-list entry must not satisfy a later client-PUT fast-path because +/// paid-list admission does not authorize storing the actual chunk. Stronger +/// entries satisfy weaker lookups. #[derive(Clone)] pub struct VerifiedCache { - /// Value: `true` if the entry was verified under the full client-PUT - /// check set, `false` if only under the replication subset. - inner: Arc>>, + inner: Arc>>, hits: Arc, misses: Arc, additions: Arc, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum VerificationLevel { + PaidList, + ClientPut, +} + +impl VerificationLevel { + fn satisfies(self, required: Self) -> bool { + matches!( + (self, required), + (Self::PaidList, Self::PaidList) | (Self::ClientPut, Self::PaidList | Self::ClientPut) + ) + } +} + /// Cache statistics for monitoring. #[derive(Debug, Default, Clone, Copy)] pub struct CacheStats { @@ -86,11 +96,10 @@ impl VerifiedCache { } } - /// Check if a `XorName` is in the cache (verified under either check set). + /// Check if a `XorName` is in the cache (verified under any fresh check set). /// /// Returns `true` if the `XorName` is cached (verified to exist on autonomi). - /// Sufficient for replication-context lookups; client-PUT lookups must use - /// [`Self::contains_client_put_verified`]. + /// Paid-list and client-PUT lookups must use their stricter helpers. #[must_use] pub fn contains(&self, xorname: &XorName) -> bool { let found = self.inner.lock().get(xorname).is_some(); @@ -104,14 +113,42 @@ impl VerifiedCache { found } + /// Check if a `XorName` is cached AND its verification ran at least the + /// paid-list admission check set. + /// + /// A client-PUT entry returns `true` here because it passed the stricter + /// store-admission path at the caller. + #[must_use] + pub fn contains_paid_list_verified(&self, xorname: &XorName) -> bool { + let found = self + .inner + .lock() + .get(xorname) + .copied() + .is_some_and(|level| level.satisfies(VerificationLevel::PaidList)); + + if found { + self.hits.fetch_add(1, Ordering::Relaxed); + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + } + + found + } + /// Check if a `XorName` is cached AND its verification ran the full - /// client-PUT check set. + /// client-PUT store-admission check set. /// - /// A replication-verified entry returns `false` here: it never passed the - /// client-PUT-only checks, so it must not let a later client PUT skip them. + /// Paid-list entries return `false` here because they did not pass the + /// client-PUT store-admission path. #[must_use] pub fn contains_client_put_verified(&self, xorname: &XorName) -> bool { - let found = self.inner.lock().get(xorname).copied() == Some(true); + let found = self + .inner + .lock() + .get(xorname) + .copied() + .is_some_and(|level| level.satisfies(VerificationLevel::ClientPut)); if found { self.hits.fetch_add(1, Ordering::Relaxed); @@ -125,27 +162,32 @@ impl VerifiedCache { /// Add a `XorName` verified under the full client-PUT check set. /// /// This should be called after verifying that data exists on the autonomi network. - /// Also upgrades an existing replication-verified entry. + /// Also upgrades an existing paid-list-verified entry. pub fn insert(&self, xorname: XorName) { - self.inner.lock().put(xorname, true); - self.additions.fetch_add(1, Ordering::Relaxed); + self.insert_with_level(xorname, VerificationLevel::ClientPut); } - /// Add a `XorName` verified under the replication (receipt-authenticity) - /// subset only. + /// Add a `XorName` verified under paid-list admission checks. /// - /// Never downgrades an existing client-PUT-verified entry — the stronger - /// verification already happened, and replication re-offers of the same - /// key are routine. - pub fn insert_replication_verified(&self, xorname: XorName) { + /// Never downgrades an existing client-PUT-verified entry. + pub fn insert_paid_list_verified(&self, xorname: XorName) { + self.insert_with_level(xorname, VerificationLevel::PaidList); + } + + fn insert_with_level(&self, xorname: XorName, level: VerificationLevel) { let added = { let mut inner = self.inner.lock(); // `get_mut` refreshes LRU recency for existing entries of either kind. - if inner.get_mut(&xorname).is_none() { - inner.put(xorname, false); - true - } else { + if inner.get(&xorname).is_some() { + if let Some(existing) = inner.get_mut(&xorname) { + if !existing.satisfies(level) { + *existing = level; + } + } false + } else { + inner.put(xorname, level); + true } }; if added { @@ -216,6 +258,29 @@ mod tests { assert_eq!(cache.len(), 2); } + #[test] + fn test_cache_verification_levels_do_not_downgrade_or_over_authorize() { + let cache = VerifiedCache::new(); + let paid_list = [2u8; 32]; + let client_put = [3u8; 32]; + + cache.insert_paid_list_verified(paid_list); + assert!(cache.contains(&paid_list)); + assert!(cache.contains_paid_list_verified(&paid_list)); + assert!(!cache.contains_client_put_verified(&paid_list)); + + cache.insert(paid_list); + assert!(cache.contains_client_put_verified(&paid_list)); + + cache.insert(client_put); + assert!(cache.contains(&client_put)); + assert!(cache.contains_paid_list_verified(&client_put)); + assert!(cache.contains_client_put_verified(&client_put)); + + cache.insert_paid_list_verified(client_put); + assert!(cache.contains_client_put_verified(&client_put)); + } + #[test] fn test_cache_stats() { let cache = VerifiedCache::new(); diff --git a/src/payment/quote.rs b/src/payment/quote.rs index 5a1a44d9..eb5419fe 100644 --- a/src/payment/quote.rs +++ b/src/payment/quote.rs @@ -45,12 +45,11 @@ pub struct QuoteGenerator { /// /// When attached, quote prices are computed from /// [`LmdbStorage::current_chunks()`] — the **same** count the - /// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate - /// compares the quote against. Keeping pricing and freshness on one source - /// means a quote priced at record count `N` is later checked against a - /// current count that differs only by genuine in-flight growth, instead of - /// by the standing client-PUT-vs-replication gap that rejected every - /// payment when pricing read the side counter and freshness read the store. + /// [`PaymentVerifier`](crate::payment::PaymentVerifier) price-floor check + /// compares the paid quote against. Keeping pricing and verification on one + /// source means a quote priced at record count `N` is later checked against + /// a current count that differs only by genuine in-flight growth, instead of + /// by a side-counter-vs-store gap. /// `None` until [`Self::attach_storage`] is called. storage: RwLock>>, /// Signing function provided by the node. @@ -84,10 +83,10 @@ impl QuoteGenerator { /// authoritative on-disk record count. /// /// This MUST be wired to the same `LmdbStorage` the - /// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate reads - /// via `current_chunks()`; otherwise pricing and freshness diverge and the - /// gate rejects healthy payments. Idempotent: calling twice replaces the - /// handle. Uses interior mutability so it can be called on an `Arc`. + /// [`PaymentVerifier`](crate::payment::PaymentVerifier) price-floor check + /// reads via `current_chunks()`; otherwise pricing and verification diverge + /// and healthy payments can be rejected. Idempotent: calling twice replaces + /// the handle. Uses interior mutability so it can be called on an `Arc`. pub fn attach_storage(&self, storage: Arc) { *self.storage.write() = Some(storage); debug!("QuoteGenerator: LmdbStorage attached for current-records pricing"); @@ -97,7 +96,7 @@ impl QuoteGenerator { /// /// Prefers the attached `LmdbStorage` count (authoritative — counts client /// PUTs, replication stores, and repair fetches alike, exactly matching the - /// verifier's freshness source). Falls back to the in-memory + /// verifier's price-floor source). Falls back to the in-memory /// `metrics_tracker` when no storage is attached or the read fails, so /// pricing never panics or stalls. fn pricing_records_stored(&self) -> usize { @@ -184,7 +183,7 @@ impl QuoteGenerator { let timestamp = SystemTime::now(); // Calculate price from the authoritative current record count (the same - // count the verifier's freshness gate reads), falling back to the + // count the verifier's price-floor check reads), falling back to the // in-memory counter only when no storage is attached. let price = calculate_price(self.pricing_records_stored()); @@ -370,13 +369,13 @@ mod tests { generator } - /// Regression test for the STG-01 quote-freshness rejection: pricing must - /// read the attached store's `current_chunks()`, NOT the side counter. + /// Regression test for the STG-01 quote-pricing mismatch: pricing must read + /// the attached store's `current_chunks()`, NOT the side counter. /// /// Before the fix, the price came from `metrics_tracker` (client-PUT count - /// only) while the verifier's freshness gate read `current_chunks()` (all - /// records, including replicated ones). On a replicating network the store - /// count ran far ahead of the side counter, so every quote looked "stale". + /// only) while verifier checks read `current_chunks()` (all records, + /// including replicated ones). On a replicating network the store count ran + /// far ahead of the side counter, so every quote looked underpriced. /// Here we attach a store, write records WITHOUT touching the side counter /// (mimicking replication stores), and assert the quote prices off the /// store count — i.e. the two sources now agree. @@ -441,7 +440,7 @@ mod tests { derive_records_stored_from_price(quote.price), 25, "verifier's price-inverse must recover the store count, keeping the \ - freshness delta at ~0 for a freshly issued quote" + local price comparison aligned for a freshly issued quote" ); } diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index b310184a..4844d09a 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -11,13 +11,13 @@ use crate::payment::pricing::{calculate_price, derive_records_stored_from_price} use crate::payment::proof::{ deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType, }; -use crate::payment::single_node::SingleNodePayment; use crate::storage::lmdb::LmdbStorage; use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature}; -use evmlib::common::Amount; +use evmlib::common::{Amount, QuoteHash}; use evmlib::contract::payment_vault; use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash}; use evmlib::Network as EvmNetwork; +use evmlib::PaymentQuote; use evmlib::ProofOfPayment; use evmlib::RewardsAddress; use lru::LruCache; @@ -25,6 +25,8 @@ use parking_lot::{Mutex, RwLock}; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; use saorsa_core::identity::PeerId; use saorsa_core::P2PNode; +#[cfg(any(test, feature = "test-utils"))] +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; @@ -42,25 +44,38 @@ pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32; /// 256 KB provides headroom while still capping memory during verification. pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144; -/// Maximum percentage by which a quote's paid price may fall *below* the node's -/// current price before the quote is rejected as stale. +/// Maximum percentage by which the median-paid quote may fall below this +/// verifier's current local price before a client PUT is rejected. /// -/// The freshness gate is one-directional and price-based, not a symmetric -/// record-count delta: -/// -/// - **Over-payment is always accepted.** If the client paid at least the -/// node's current price (e.g. the node pruned records and is now cheaper), -/// the quote is fine — a node has no reason to reject money. -/// - **Only meaningful under-payment is rejected.** A quote priced below the -/// current price by more than this percentage is rejected as stale. -/// -/// Comparing prices instead of raw record counts makes the tolerance -/// self-scaling against the quadratic pricing curve: at low/moderate fill the -/// curve is nearly flat, so normal in-flight churn (the node storing a handful -/// of replicated records between quoting and verifying) is a negligible price -/// change and passes; at high fill the curve is steep, so the same percentage -/// still catches genuinely stale, underpriced quotes. -const QUOTE_PRICE_STALENESS_PCT_TOLERANCE: u64 = 25; +/// A 20% floor means a paid quote must be at least `0.8 * P_v`, so an +/// attacker who controls a real close-group issuer still pays at least +/// `0.8 * P_v * 3` for an honest verifier. Honest median-paid bundles have +/// a structural majority guarantee: the four nodes at or below the median +/// accept unless their own price grows more than `1 / 0.8 = 1.25x` between +/// quote and PUT. Above-median nodes may reject when `P_v > 1.25 * median`; +/// those records are backfilled by replication, which deliberately skips +/// this present-tense floor. +const PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT: u64 = 20; + +const PERCENT_DENOMINATOR: u64 = 100; +const PAID_QUOTE_PAYMENT_MULTIPLIER: u64 = 3; + +#[derive(Clone, Copy)] +struct LegacyMedianCandidate<'a> { + encoded_peer_id: &'a evmlib::EncodedPeerId, + quote: &'a PaymentQuote, + expected_amount: Amount, +} + +fn price_floor(current_price: Amount, tolerance_pct: u64) -> Amount { + current_price.saturating_mul(Amount::from( + PERCENT_DENOMINATOR.saturating_sub(tolerance_pct), + )) / Amount::from(PERCENT_DENOMINATOR) +} + +fn median_quote_index(quote_count: usize) -> usize { + quote_count / 2 +} /// Configuration for EVM payment verification. /// @@ -90,67 +105,45 @@ pub struct PaymentVerifierConfig { pub evm: EvmVerifierConfig, /// Cache capacity (number of `XorName` values to cache). pub cache_capacity: usize, + /// Close-group width used to check paid-quote issuer locality. + pub close_group_size: usize, /// Local node's rewards address. - /// The verifier rejects payments that don't include this node as a recipient. + /// + /// Kept in the verifier config for payment policies that bind receipts to + /// this node's payout address. pub local_rewards_address: RewardsAddress, } -/// The situation a payment proof is being verified in. +/// The fresh admission path a payment proof is being verified for. /// -/// A proof-of-payment is a *receipt*: it records a sale that closed at some -/// earlier moment, at that moment's prices, between the client and the close -/// group of that moment. Two very different callers present receipts: +/// - **`ClientPut`** — the node is admitting a chunk store. The verifier +/// applies store-strength cache semantics and live payment checks. +/// - **`PaidListAdmission`** — the node is admitting fresh paid-list metadata. +/// It runs the same live payment checks as `ClientPut`, but writes a weaker +/// cache entry that does not authorize future chunk stores. /// -/// - **`ClientPut`** — the node is the storer being paid *right now*. Every -/// check applies, including the ones that interrogate the present: "is the -/// price on this receipt still fair for my current fullness?" (own-quote -/// freshness) and "am I actually one of the paid recipients?" (local -/// recipient / merkle candidate closeness). -/// - **`Replication`** — a neighbour is handing over an already-paid record -/// (fresh-write fan-out, paid-notify, repair). The sale closed long ago; the -/// network's job now is to keep the record at target redundancy for the rest -/// of its life. Re-asking the present-tense questions of a receipt is a -/// category error with a guaranteed failure mode: record counts only grow, -/// so every receipt's quoted price eventually drops below the verifier's -/// live floor, and close groups churn, so the receiving node eventually -/// isn't a quoted recipient at all. On DEV-01 (2026-06-05) this rejected -/// nearly 100% of replication proof-of-payment transfers within an hour of -/// launch (4M+ -/// rejections at ~300k/hour), pinned records below target redundancy, and -/// drove a permanent ~500 MB/s fleet-wide re-offer storm. +/// The caller must check local receiver/admission membership before invoking +/// the verifier: direct client PUTs and fresh chunk replication require local +/// close-group responsibility; fresh paid-list replication requires local +/// paid-list close-group membership. The verifier itself only checks payment +/// proof validity and that the paid quote's issuer is in the configured close +/// group for the quoted chunk address. /// -/// Under `Replication` the verifier therefore skips only the -/// storer-being-paid-now checks. Everything that makes the receipt a receipt -/// still runs: quote structure, content binding to this exact address, -/// peer-ID/pub-key bindings, ML-DSA signatures, and the on-chain settlement -/// lookup. A record cannot be admitted via replication without an authentic, -/// settled payment for that record. +/// Immediate fresh chunk replication is different: the receiver is about to +/// store the newly written chunk as if the client PUT it there directly, so +/// that call site deliberately uses `ClientPut`. /// -/// The verified-`XorName` cache is context-aware to match: an entry inserted -/// by a `Replication` verification satisfies later replication lookups but -/// NOT a later `ClientPut` fast-path, so a replication receipt can never let -/// a client PUT bypass the checks this enum gates. -/// -/// Trade-off (deliberate, documented): skipping the recipient/closeness -/// checks for replication means a payer who self-deals — minting a quote pool -/// from peers they control and settling the median payment to their own -/// wallet on-chain — can present that receipt to honest nodes via the -/// replication protocol, paying only gas plus a recycled self-payment instead -/// of paying real storers. The client-PUT path still rejects such pools, and -/// replication admission still requires the receiving node to be responsible -/// for the key, so the abuse costs a settled on-chain payment per chunk and -/// buys only what storage already costs; closing it properly belongs in quote -/// issuance / payment policy, not in the replication hot path, where the -/// equivalent defence provably destroys the network's ability to heal. +/// Later neighbour-sync repair does not include proof-of-payment bytes and +/// does not call this verifier. It authorizes repair from network evidence: +/// majority storage among the configured close group, or majority paid-list +/// membership among the closest K. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum VerificationContext { - /// The node is the storer being paid right now: all checks apply. + /// The node is admitting a chunk store with store-strength cache semantics. ClientPut, - /// An already-settled receipt presented during replication/repair: skip - /// the storer-being-paid-now checks (own-quote price freshness, local - /// recipient, merkle candidate closeness); keep all receipt-authenticity - /// checks. - Replication, + /// The node is admitting fresh paid-list metadata with paid-list-strength + /// cache semantics. + PaidListAdmission, } /// Status returned by payment verification. @@ -202,30 +195,36 @@ pub struct PaymentVerifier { /// amplification to one lookup per unique `pool_hash` regardless of /// concurrency. inflight_closeness: Mutex>>, - /// P2P node handle, attached post-construction so merkle verification can - /// check that candidate `pub_keys` map to peers actually close to the pool - /// midpoint in the live DHT. `None` in unit tests that don't exercise - /// merkle verification; production startup MUST call [`attach_p2p_node`]. + /// P2P node handle, attached post-construction so paid-quote verification + /// can check issuer closeness, and merkle verification can check that + /// candidate `pub_keys` map to peers actually close to the pool midpoint + /// in the live DHT. `None` in unit tests that don't exercise live-DHT + /// checks; production startup MUST call [`attach_p2p_node`]. p2p_node: RwLock>>, - /// LMDB storage handle, attached post-construction so the storage-delta - /// freshness check can read the authoritative on-disk record count without + /// LMDB storage handle, attached post-construction so the paid-quote + /// price-floor check can read the authoritative on-disk record count without /// depending on a side counter that may drift from replication/repair/prune /// paths. `None` in unit tests that pre-set [`Self::test_records_override`]; /// production startup MUST call [`attach_storage`]. storage: RwLock>>, - /// Test-only override for the storage-delta freshness check. + /// Test-only override for the paid-quote local price floor. /// - /// When `Some(n)`, `validate_quote_freshness` uses `n` as the current - /// record count instead of querying `storage.current_chunks()`. Set via + /// When `Some(n)`, `validate_paid_quote_price_floor` uses `n` as the + /// current record count instead of querying `storage.current_chunks()`. Set via /// [`Self::set_records_stored_for_tests`] so unit tests that don't wire a - /// real `LmdbStorage` can still drive the freshness logic. + /// real `LmdbStorage` can still drive the price-floor logic. test_records_override: RwLock>, - /// Test-only override for this node's own peer ID, used by - /// `validate_quote_freshness` to pick out the node's own quote from the - /// payment bundle. Production code derives it from the attached - /// [`P2PNode`]; set via [`Self::set_peer_id_for_tests`] so unit tests can - /// drive the freshness logic without wiring a real `P2PNode`. - test_peer_id_override: RwLock>, + /// Test-only override for the paid-quote issuer close-group check. + /// + /// Production code derives closest peers from the attached [`P2PNode`]. + #[cfg(any(test, feature = "test-utils"))] + test_paid_quote_close_group_override: RwLock>>, + /// Test-only override for `completedPayments(quote_hash)`. + /// + /// Production always queries the payment vault; unit tests use this to + /// exercise the full verifier path without starting an EVM chain. + #[cfg(any(test, feature = "test-utils"))] + test_completed_payments_override: RwLock>, /// Configuration. config: PaymentVerifierConfig, } @@ -325,13 +324,13 @@ impl PaymentVerifier { info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})"); // Loud warning if a production binary was accidentally built with - // `test-utils`: that feature flips the closeness-check fail-open - // switch, disabling the pay-yourself defence when P2PNode isn't - // attached. Safe in tests, never intended for prod. + // `test-utils`: that feature flips the live-DHT payment-check + // fail-open switches when P2PNode isn't attached. Safe in tests, never + // intended for prod. #[cfg(feature = "test-utils")] crate::logging::error!( - "PaymentVerifier: built with `test-utils` feature — merkle closeness \ - defence falls back to fail-open when no P2PNode is attached. This \ + "PaymentVerifier: built with `test-utils` feature — payment live-DHT \ + checks fall back to fail-open when no P2PNode is attached. This \ feature is for test binaries only; production nodes must be built \ without it." ); @@ -344,38 +343,48 @@ impl PaymentVerifier { p2p_node: RwLock::new(None), storage: RwLock::new(None), test_records_override: RwLock::new(None), - test_peer_id_override: RwLock::new(None), + #[cfg(any(test, feature = "test-utils"))] + test_paid_quote_close_group_override: RwLock::new(None), + #[cfg(any(test, feature = "test-utils"))] + test_completed_payments_override: RwLock::new(HashMap::new()), config, } } - /// Attach the node's [`P2PNode`] handle so merkle-payment verification can - /// check candidate `pub_keys` against the DHT's actual closest peers to the - /// pool midpoint. + /// Attach the node's [`P2PNode`] handle so paid-quote verification can + /// check issuer closeness, and merkle-payment verification can check + /// candidate `pub_keys` against the DHT's actual closest peers to the pool + /// midpoint. /// /// Production startup MUST call this once the `P2PNode` exists. Without - /// it, the closeness check fails CLOSED in release builds (rejects the - /// PUT with a visible error) and fails open in test builds. Idempotent: - /// calling twice replaces the handle. + /// it, live-DHT payment checks fail CLOSED in release builds with a visible + /// error and fail open in test builds. Idempotent: calling twice replaces + /// the handle. pub fn attach_p2p_node(&self, node: Arc) { *self.p2p_node.write() = Some(node); - debug!("PaymentVerifier: P2PNode attached for merkle closeness checks"); + debug!("PaymentVerifier: P2PNode attached for payment live-DHT checks"); + } + + /// Configured close-group width used by payment proof admission callers. + #[must_use] + pub fn close_group_size(&self) -> usize { + self.config.close_group_size } - /// Attach the node's [`LmdbStorage`] handle so storage-delta freshness + /// Attach the node's [`LmdbStorage`] handle so paid-quote price-floor /// checks can query the authoritative on-disk record count. /// /// Production startup MUST call this once the storage exists; otherwise - /// `validate_quote_freshness` falls back to treating the current count as - /// zero, which will reject all non-trivial quotes. Idempotent: calling - /// twice replaces the handle. + /// client PUTs using paid-quote verification are rejected because + /// the local economic floor cannot be checked. Idempotent: calling twice + /// replaces the handle. pub fn attach_storage(&self, storage: Arc) { *self.storage.write() = Some(storage); - debug!("PaymentVerifier: LmdbStorage attached for storage-delta freshness checks"); + debug!("PaymentVerifier: LmdbStorage attached for paid-quote price-floor checks"); } - /// Test-only setter for the current record count used by storage-delta - /// freshness checks. Lets unit tests drive the freshness logic without + /// Test-only setter for the current record count used by paid-quote + /// price-floor checks. Lets unit tests drive the floor logic without /// wiring a real `LmdbStorage`. Has no effect in production code because /// production code is expected to call [`Self::attach_storage`] instead. #[cfg(any(test, feature = "test-utils"))] @@ -383,45 +392,44 @@ impl PaymentVerifier { *self.test_records_override.write() = Some(count); } - /// Test-only setter for the node's own peer ID used by the quote - /// freshness check. Lets unit tests mark which quote in a payment bundle - /// is "ours" without wiring a real `P2PNode`. Has no effect in production - /// code because production code is expected to call - /// [`Self::attach_p2p_node`] instead. + /// Test-only setter for local closest peers used by the paid-quote + /// issuer close-group check. #[cfg(any(test, feature = "test-utils"))] - pub fn set_peer_id_for_tests(&self, peer_id_bytes: [u8; 32]) { - *self.test_peer_id_override.write() = Some(peer_id_bytes); + pub fn set_paid_quote_close_group_for_tests(&self, peer_ids: Vec<[u8; 32]>) { + *self.test_paid_quote_close_group_override.write() = Some(peer_ids); } - /// Snapshot this node's own peer ID for the quote freshness check. - /// - /// Prefers the attached [`P2PNode`] (authoritative). Falls back to a test - /// override if one was set. Returns `None` only when no source is - /// available (mis-configured production startup); the caller treats that - /// as "unknown" and skips the freshness gate rather than rejecting — the - /// same fail-open posture as a missing record-count source. - fn self_peer_id_bytes(&self) -> Option<[u8; 32]> { - if let Some(node) = self.p2p_node.read().as_ref() { - return Some(*node.peer_id().as_bytes()); - } - *self.test_peer_id_override.read() + /// Compatibility alias for older tests that called this the known-peer + /// set. The check is now specifically the configured close group for the + /// quoted chunk address. + #[cfg(any(test, feature = "test-utils"))] + pub fn set_paid_quote_known_peers_for_tests(&self, peer_ids: Vec<[u8; 32]>) { + self.set_paid_quote_close_group_for_tests(peer_ids); + } + + /// Test-only setter for an on-chain completed payment amount. + #[cfg(any(test, feature = "test-utils"))] + pub fn set_completed_payment_for_tests(&self, quote_hash: QuoteHash, amount: Amount) { + self.test_completed_payments_override + .write() + .insert(quote_hash, amount); } - /// Snapshot the current record count for freshness comparisons. + /// Snapshot the current record count for paid-quote price-floor checks. /// /// Prefers the attached `LmdbStorage` (authoritative — covers client PUTs, /// replication stores, repair fetches, and prune deletes by definition). /// Falls back to a test override if one was set. Returns `None` only when - /// no source is available (mis-configured production startup); the caller - /// treats that as "unknown" and skips storage-delta gating rather than - /// rejecting all quotes outright. + /// no source is available (mis-configured production startup). The + /// paid-quote floor rejects client PUTs because the local floor is + /// the economic security gate for this proof policy. fn current_records_stored(&self) -> Option { if let Some(storage) = self.storage.read().as_ref() { match storage.current_chunks() { Ok(n) => return Some(n), Err(e) => { warn!( - "PaymentVerifier: failed to read current_chunks() for freshness check: {e}" + "PaymentVerifier: failed to read current_chunks() for price-floor check: {e}" ); return None; } @@ -436,11 +444,9 @@ impl PaymentVerifier { /// 1. Check LRU cache (fast path) /// 2. If not cached, payment is required /// - /// The fast path is context-aware: a `ClientPut` lookup is satisfied only - /// by an entry whose verification ran the full client-PUT check set. An - /// entry inserted by a `Replication` verification (which skips the - /// storer-being-paid-now checks) must not let a later client PUT bypass - /// those checks. A `Replication` lookup accepts either kind of entry. + /// The fast path is context-aware. A `ClientPut` lookup is satisfied only + /// by a close-group store verification. A `PaidListAdmission` lookup is + /// satisfied by either a paid-list or client-PUT verification. /// /// # Arguments /// @@ -459,7 +465,9 @@ impl PaymentVerifier { // Check LRU cache (fast path) let cached = match context { VerificationContext::ClientPut => self.cache.contains_client_put_verified(xorname), - VerificationContext::Replication => self.cache.contains(xorname), + VerificationContext::PaidListAdmission => { + self.cache.contains_paid_list_verified(xorname) + } }; if cached { if crate::logging::enabled!(crate::logging::Level::DEBUG) { @@ -488,9 +496,8 @@ impl PaymentVerifier { /// /// * `xorname` - The content-addressed name of the data /// * `payment_proof` - Optional payment proof (required if not in cache) - /// * `context` - Whether the proof backs a live client PUT or an - /// already-settled receipt presented during replication — see - /// [`VerificationContext`] for which checks each context runs + /// * `context` - Which fresh admission path is verifying the proof — see + /// [`VerificationContext`] for cache-strength semantics /// /// # Returns /// @@ -562,15 +569,13 @@ impl PaymentVerifier { } } - // Cache the verified xorname, recording which check set - // ran. A Replication-verified entry satisfies later - // replication lookups (re-offers of the same key are - // routine) but not a later ClientPut fast-path — the - // context-gated checks were never run for it. + // Cache the verified xorname at the context's verification + // strength. Stronger entries satisfy weaker future lookups, + // but not the reverse. match context { VerificationContext::ClientPut => self.cache.insert(*xorname), - VerificationContext::Replication => { - self.cache.insert_replication_verified(*xorname); + VerificationContext::PaidListAdmission => { + self.cache.insert_paid_list_verified(*xorname); } } @@ -624,24 +629,21 @@ impl PaymentVerifier { /// Verify a single-node EVM payment proof. /// /// Verification steps: - /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present - /// 2. All quotes target the correct content address (xorname binding) - /// 3. This node's own quote price is fresh (`ClientPut` only — a - /// replication receipt's price was fixed at the original sale and the - /// node's record count has legitimately grown since) - /// 4. Peer ID bindings match the ML-DSA-65 public keys - /// 5. This node is among the quoted recipients (`ClientPut` only — a - /// post-churn close-group member receiving a record via replication - /// was never a payee on the original receipt) - /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`) - /// 7. The median-priced quote was paid at least 3x its price on-chain - /// (looked up via `completedPayments(quoteHash)` on the payment vault) - /// - /// See [`VerificationContext`] for why steps 3 and 5 are context-gated. + /// 1. Between 1 and `CLOSE_GROUP_SIZE` quotes are present + /// 2. Median-priced candidate quotes are derived from the supplied bundle + /// 3. Each candidate is checked for content binding, peer binding, and a + /// valid ML-DSA-65 signature + /// 4. Each candidate must also come from a local close-group peer and + /// satisfy the paid-quote price floor + /// 5. A candidate is accepted only if `completedPayments(quoteHash)` is at + /// least 3x the median price /// - /// For unit tests that don't need on-chain verification, pre-populate - /// the cache so `verify_payment` returns `CachedAsVerified` before - /// reaching this method. + /// Non-median quotes are parsed only to locate the median. Their content, + /// peer bindings, and signatures are deliberately ignored: the paid + /// quote's content hash, quote hash, signature, local floor, issuer + /// close-group + /// check, and on-chain settlement are the authority. A one-quote proof is + /// valid when that single quote passes these checks and was paid 3x. async fn verify_evm_payment( &self, xorname: &XorName, @@ -657,233 +659,305 @@ impl PaymentVerifier { } Self::validate_quote_structure(payment)?; - Self::validate_quote_content(payment, xorname)?; - if context == VerificationContext::ClientPut { - self.validate_quote_freshness(payment)?; - } - Self::validate_peer_bindings(payment)?; - if context == VerificationContext::ClientPut { - self.validate_local_recipient(payment)?; - } + let candidates = Self::legacy_median_candidates(payment)?; + let mut failures = Vec::with_capacity(candidates.len()); + let mut verified_paid_quote = false; - // Verify quote signatures (CPU-bound, run off async runtime) - let peer_quotes = payment.peer_quotes.clone(); - tokio::task::spawn_blocking(move || { - for (encoded_peer_id, quote) in &peer_quotes { - if !verify_quote_signature(quote) { - return Err(Error::Payment( - format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"), - )); + for candidate in candidates { + match self + .verify_legacy_median_candidate(xorname, candidate) + .await + { + Ok(()) => { + verified_paid_quote = true; + break; } + Err(err) => failures.push(err.to_string()), } - Ok(()) - }) - .await - .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??; + } + + if !verified_paid_quote { + let xorname_hex = hex::encode(xorname); + let details = if failures.is_empty() { + "no median-priced candidates were available".to_string() + } else { + failures.join("; ") + }; + return Err(Error::Payment(format!( + "Median quote payment verification failed for {xorname_hex}: {details}" + ))); + } + + if crate::logging::enabled!(crate::logging::Level::INFO) { + let xorname_hex = hex::encode(xorname); + info!("EVM payment verified for {xorname_hex}"); + } + Ok(()) + } - // Reconstruct the SingleNodePayment to identify the median quote. - // from_quotes() sorts by price and marks the median for 3x payment. - let quotes_with_prices: Vec<_> = payment + fn legacy_median_candidates( + payment: &ProofOfPayment, + ) -> Result>> { + let mut sorted_quotes: Vec<(&evmlib::EncodedPeerId, &PaymentQuote)> = payment .peer_quotes .iter() - .map(|(_, quote)| (quote.clone(), quote.price)) + .map(|(encoded_peer_id, quote)| (encoded_peer_id, quote)) .collect(); - let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| { - Error::Payment(format!( - "Failed to reconstruct payment for verification: {e}" - )) - })?; - - // Verify the median quote was paid at least 3x its price on-chain - // via completedPayments(quoteHash) on the payment vault contract. - let verified_amount = single_payment - .verify(&self.config.evm.network) - .await - .map_err(|e| { - let xorname_hex = hex::encode(xorname); + sorted_quotes.sort_by_key(|(_, quote)| quote.price); + let quote_count = sorted_quotes.len(); + let median_index = median_quote_index(quote_count); + let median_price = sorted_quotes + .get(median_index) + .ok_or_else(|| { + Error::Payment(format!("Missing paid quote at median index {median_index}")) + })? + .1 + .price; + let expected_amount = median_price + .checked_mul(Amount::from(PAID_QUOTE_PAYMENT_MULTIPLIER)) + .ok_or_else(|| { Error::Payment(format!( - "Median quote payment verification failed for {xorname_hex}: {e}" + "Median quote payment amount overflow for price {median_price}" )) })?; - if crate::logging::enabled!(crate::logging::Level::INFO) { - let xorname_hex = hex::encode(xorname); - info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)"); + if expected_amount == Amount::ZERO || median_price == Amount::ZERO { + return Err(Error::Payment(format!( + "Median quote has zero price/amount (price={median_price}, amount={expected_amount}); refusing to verify as paid" + ))); } - Ok(()) + + Ok(sorted_quotes + .into_iter() + .filter(|(_, quote)| quote.price == median_price) + .map(|(encoded_peer_id, quote)| LegacyMedianCandidate { + encoded_peer_id, + quote, + expected_amount, + }) + .collect()) } - /// Validate quote count, uniqueness, and basic structure. - fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> { - if payment.peer_quotes.is_empty() { - return Err(Error::Payment("Payment has no quotes".to_string())); + async fn verify_legacy_median_candidate( + &self, + xorname: &XorName, + candidate: LegacyMedianCandidate<'_>, + ) -> Result<()> { + Self::validate_paid_quote_content(xorname, candidate)?; + let issuer_peer_id = + Self::validate_paid_quote_peer_binding(candidate.encoded_peer_id, candidate.quote)?; + + self.validate_paid_quote_issuer_close_group(xorname, &issuer_peer_id) + .await?; + self.validate_paid_quote_price_floor(candidate.quote)?; + + Self::validate_paid_quote_signature(candidate).await?; + + let on_chain_amount = self + .completed_payment_amount(candidate.quote.hash()) + .await?; + if on_chain_amount >= candidate.expected_amount { + return Ok(()); } - let quote_count = payment.peer_quotes.len(); - if quote_count != CLOSE_GROUP_SIZE { - return Err(Error::Payment(format!( - "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}" - ))); + Err(Error::Payment(format!( + "Median-priced quote for peer {:?} was not paid enough: expected at least {}, got {on_chain_amount}", + candidate.encoded_peer_id, candidate.expected_amount + ))) + } + + fn validate_paid_quote_content( + xorname: &XorName, + candidate: LegacyMedianCandidate<'_>, + ) -> Result<()> { + if verify_quote_content(candidate.quote, xorname) { + return Ok(()); } - let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count); - for (encoded_peer_id, _) in &payment.peer_quotes { - if seen.contains(&encoded_peer_id) { + let expected_hex = hex::encode(xorname); + let actual_hex = hex::encode(candidate.quote.content.0); + Err(Error::Payment(format!( + "Paid quote content address mismatch for peer {:?}: expected {expected_hex}, got {actual_hex}", + candidate.encoded_peer_id + ))) + } + + async fn validate_paid_quote_signature(candidate: LegacyMedianCandidate<'_>) -> Result<()> { + let quote_for_signature = candidate.quote.clone(); + let peer_id_for_error = candidate.encoded_peer_id.clone(); + tokio::task::spawn_blocking(move || { + if !verify_quote_signature("e_for_signature) { return Err(Error::Payment(format!( - "Duplicate peer ID in payment quotes: {encoded_peer_id:?}" + "Paid quote ML-DSA-65 signature verification failed for peer {peer_id_for_error:?}" ))); } - seen.push(encoded_peer_id); + Ok(()) + }) + .await + .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))? + } + + async fn completed_payment_amount(&self, quote_hash: QuoteHash) -> Result { + #[cfg(any(test, feature = "test-utils"))] + { + let completed_payment_override = { + self.test_completed_payments_override + .read() + .get("e_hash) + .copied() + }; + if let Some(amount) = completed_payment_override { + return Ok(amount); + } } - Ok(()) + let provider = evmlib::utils::http_provider(self.config.evm.network.rpc_url().clone()); + let vault_address = *self.config.evm.network.payment_vault_address(); + let contract = payment_vault::interface::IPaymentVault::new(vault_address, provider); + + let result = contract + .completedPayments(quote_hash) + .call() + .await + .map_err(|e| Error::Payment(format!("completedPayments lookup failed: {e}")))?; + + Ok(Amount::from(result.amount)) } - /// Verify all quotes target the correct content address. - fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> { - for (encoded_peer_id, quote) in &payment.peer_quotes { - if !verify_quote_content(quote, xorname) { - let expected_hex = hex::encode(xorname); - let actual_hex = hex::encode(quote.content.0); - return Err(Error::Payment(format!( - "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}" - ))); - } + fn validate_paid_quote_peer_binding( + encoded_peer_id: &evmlib::EncodedPeerId, + quote: &PaymentQuote, + ) -> Result { + let expected_peer_id = peer_id_from_public_key_bytes("e.pub_key) + .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?; + + if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() { + let expected_hex = expected_peer_id.to_hex(); + let actual_hex = hex::encode(encoded_peer_id.as_bytes()); + return Err(Error::Payment(format!( + "Paid quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \ + BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}" + ))); } - Ok(()) + + Ok(expected_peer_id) } - /// Verify quote freshness by price staleness, not wall-clock time and not a - /// symmetric record-count delta. - /// - /// The quote price encodes the quoting node's record count via the quadratic - /// pricing formula. We compute the price the node would charge *now* for its - /// current fullness and reject the quote only if the client under-paid that - /// current price by more than [`QUOTE_PRICE_STALENESS_PCT_TOLERANCE`]. This: - /// - /// - removes the platform clock dependency that caused Windows/UTC false - /// rejections (timestamps are deliberately unused); - /// - never rejects an over-payment (the previous symmetric `abs_diff` check - /// rejected quotes where the node had *fewer* records than when it quoted, - /// i.e. the client paid for a fuller, pricier node — nonsensical to - /// reject); and - /// - self-scales with the pricing curve, so benign in-flight churn (a node - /// storing a few replicated records between quoting and verifying) — a - /// negligible price move where the curve is flat — no longer rejects an - /// otherwise-valid payment. On a fresh, rapidly-filling testnet that churn - /// routinely exceeded the old fixed 5-record tolerance and rejected ~100% - /// of uploads via the multiplicative per-chunk effect. - /// - /// The current record count comes from the attached [`LmdbStorage`] via - /// `current_chunks()` — an O(1) B-tree page-header read, authoritative - /// regardless of which path stored the record (client PUT, replication - /// store, repair fetch) or removed it (prune delete). If no storage source - /// is available (mis-configured production startup, or a unit test that - /// didn't set a test override), the gate is skipped entirely rather than - /// rejecting every quote — see [`Self::current_records_stored`]. - /// - /// **Only this node's own quote is gated.** A bundle contains one quote - /// per close-group peer, and fullness across a close group is wildly - /// heterogeneous on a real network (a freshly joined node holds tens of - /// records while an established neighbour holds thousands). Comparing a - /// *neighbour's* quote price against *this node's* record count therefore - /// rejects honest payments whenever the group spans more than the - /// tolerance — on ant-prod-01 a close group spanning 47..=1788 records - /// made the three fullest nodes reject every bundle containing the - /// emptiest node's (perfectly fresh, 10-second-old) quote, failing the - /// PUT after the client had already paid on-chain. The node can only - /// re-derive *its own* price from its own record count, so its own quote - /// is the only one it can legitimately call stale. Replay of another - /// node's old cheap quote is that node's gate to enforce when the PUT - /// reaches it; the on-chain median payment binding is unaffected either - /// way. - /// - /// A bundle holds at most one quote per peer — [`Self::validate_quote_structure`] - /// rejects duplicate peer IDs and runs before this gate on every path — - /// so the loop below matches at most one own quote. - fn validate_quote_freshness(&self, payment: &ProofOfPayment) -> Result<()> { + fn validate_paid_quote_price_floor(&self, quote: &PaymentQuote) -> Result<()> { let Some(current_records) = self.current_records_stored() else { - debug!( - "PaymentVerifier: no record-count source attached; skipping \ - quote price-staleness check" - ); - return Ok(()); - }; - - let Some(self_peer_id) = self.self_peer_id_bytes() else { - debug!( - "PaymentVerifier: no self peer-id source attached; skipping \ - quote price-staleness check" - ); - return Ok(()); + return Err(Error::Payment( + "PaymentVerifier: no record-count source attached; cannot verify \ + paid-quote local price floor" + .to_string(), + )); }; - // The price the node would charge right now for its current fullness, - // and the floor a quote may not drop below (one-directional: paying at - // or above `current_price` is always accepted). let current_price = calculate_price(usize::try_from(current_records).unwrap_or(usize::MAX)); - let min_acceptable_price = current_price.saturating_mul(Amount::from( - 100u64.saturating_sub(QUOTE_PRICE_STALENESS_PCT_TOLERANCE), - )) / Amount::from(100u64); - - let mut own_quote_seen = false; - for (encoded_peer_id, quote) in &payment.peer_quotes { - if encoded_peer_id.as_bytes() != &self_peer_id { - // A neighbour's quote prices the *neighbour's* fullness; this - // node has no basis to judge it against its own record count. - continue; - } - own_quote_seen = true; - if quote.price < min_acceptable_price { - let quoted_records = derive_records_stored_from_price(quote.price); - return Err(Error::Payment(format!( - "Own quote {encoded_peer_id:?} stale: quoted price encodes \ - {quoted_records} records but node currently holds {current_records} \ - (quoted {}, minimum acceptable {min_acceptable_price} at \ - {QUOTE_PRICE_STALENESS_PCT_TOLERANCE}% under-payment tolerance)", - quote.price - ))); - } + let min_acceptable_price = price_floor(current_price, PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT); + + if quote.price < min_acceptable_price { + let quoted_records = derive_records_stored_from_price(quote.price); + return Err(Error::Payment(format!( + "Paid quote price below local floor: quoted price encodes \ + {quoted_records} records but node currently holds {current_records} \ + (quoted {}, minimum acceptable {min_acceptable_price} at \ + {PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT}% under-payment tolerance)", + quote.price + ))); } - // Two self-identity notions coexist in this verifier and are expected - // to refer to the same node: `validate_local_recipient` matches "us" - // by rewards address, this gate by peer ID. They legitimately diverge - // when a PUT reaches a node whose own quote isn't in the bundle but - // whose rewards address is shared with a quoted sibling (common in - // fleet deployments). The gate fail-opens in that case — leave a - // breadcrumb, because a silent no-op is exactly what makes a - // production incident hard to reconstruct from node logs. - if !own_quote_seen { - let our_rewards_address_quoted = payment - .peer_quotes + Ok(()) + } + + async fn validate_paid_quote_issuer_close_group( + &self, + xorname: &XorName, + issuer_peer_id: &PeerId, + ) -> Result<()> { + #[cfg(any(test, feature = "test-utils"))] + if let Some(close_group_peer_ids) = + self.test_paid_quote_close_group_override.read().as_ref() + { + if close_group_peer_ids .iter() - .any(|(_, quote)| quote.rewards_address == self.config.local_rewards_address); - if our_rewards_address_quoted { - debug!( - "PaymentVerifier: bundle contains our rewards address but no quote \ - under our peer ID; skipping quote price-staleness check" + .any(|peer_id| peer_id == issuer_peer_id.as_bytes()) + { + return Ok(()); + } + let close_group_size = self.config.close_group_size; + return Err(Error::Payment(format!( + "Paid quote issuer {} is not among this node's local {close_group_size} closest peers for {}", + issuer_peer_id.to_hex(), + hex::encode(xorname) + ))); + } + + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + let Some(p2p_node) = attached else { + #[cfg(any(test, feature = "test-utils"))] + { + crate::logging::warn!( + "PaymentVerifier: no P2PNode attached; paid-quote issuer \ + close-group check SKIPPED (test build). Production startup MUST call \ + PaymentVerifier::attach_p2p_node." + ); + return Ok(()); + } + #[cfg(not(any(test, feature = "test-utils")))] + { + crate::logging::error!( + "PaymentVerifier: no P2PNode attached; rejecting paid-quote \ + payment. This is a node-startup bug — \ + PaymentVerifier::attach_p2p_node must be called before \ + any PUT handler runs." ); + return Err(Error::Payment( + "Paid quote rejected: verifier is not wired to the P2P \ + layer; cannot verify issuer closeness." + .into(), + )); } + }; + + let close_group_size = self.config.close_group_size; + let closest = p2p_node + .dht_manager() + .find_closest_nodes_local_with_self(xorname, close_group_size) + .await; + if closest.iter().any(|node| node.peer_id == *issuer_peer_id) { + return Ok(()); } - Ok(()) + + Err(Error::Payment(format!( + "Paid quote issuer {} is not among this node's local {close_group_size} closest peers for {}", + issuer_peer_id.to_hex(), + hex::encode(xorname) + ))) } - /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3. - fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> { - for (encoded_peer_id, quote) in &payment.peer_quotes { - let expected_peer_id = peer_id_from_public_key_bytes("e.pub_key) - .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?; + /// Validate quote count, uniqueness, and basic structure. + fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> { + if payment.peer_quotes.is_empty() { + return Err(Error::Payment("Payment has no quotes".to_string())); + } + + let quote_count = payment.peer_quotes.len(); + if quote_count > CLOSE_GROUP_SIZE { + return Err(Error::Payment(format!( + "Payment must have at most {CLOSE_GROUP_SIZE} quotes, got {quote_count}" + ))); + } - if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() { - let expected_hex = expected_peer_id.to_hex(); - let actual_hex = hex::encode(encoded_peer_id.as_bytes()); + let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count); + for (encoded_peer_id, _) in &payment.peer_quotes { + if seen.contains(&encoded_peer_id) { return Err(Error::Payment(format!( - "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \ - BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}" + "Duplicate peer ID in payment quotes: {encoded_peer_id:?}" ))); } + seen.push(encoded_peer_id); } + Ok(()) } @@ -1425,16 +1499,8 @@ impl PaymentVerifier { // single-flight keyed on pool_hash collapse the Kademlia lookup cost // within a batch and across concurrent PUTs for the same pool. // - // ClientPut only: the check interrogates the *live* DHT, but a - // replication receipt's winner pool was sampled from the DHT of the - // original sale. Churn guarantees old pools eventually stop matching - // the current top-K, which would make old records unreplicatable — - // the same failure mode the single-node freshness gate caused on - // DEV-01. See `VerificationContext` for the trade-off discussion. - if context == VerificationContext::ClientPut { - self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash) - .await?; - } + self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash) + .await?; // Check pool cache first let cached_info = { @@ -1604,21 +1670,6 @@ impl PaymentVerifier { Ok(()) } - - /// Verify this node is among the paid recipients. - fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> { - let local_addr = &self.config.local_rewards_address; - let is_recipient = payment - .peer_quotes - .iter() - .any(|(_, quote)| quote.rewards_address == *local_addr); - if !is_recipient { - return Err(Error::Payment( - "Payment proof does not include this node as a recipient".to_string(), - )); - } - Ok(()) - } } #[cfg(test)] @@ -1626,6 +1677,10 @@ impl PaymentVerifier { mod tests { use super::*; use evmlib::merkle_payments::MerklePaymentCandidatePool; + use evmlib::PaymentQuote; + use saorsa_core::MlDsa65; + use saorsa_pqc::pqc::types::MlDsaSecretKey; + use saorsa_pqc::pqc::MlDsaOperations; use std::time::SystemTime; /// Create a verifier for unit tests. EVM is always on, but tests can @@ -1634,11 +1689,132 @@ mod tests { let config = PaymentVerifierConfig { evm: EvmVerifierConfig::default(), cache_capacity: 100, + close_group_size: CLOSE_GROUP_SIZE, local_rewards_address: RewardsAddress::new([1u8; 20]), }; PaymentVerifier::new(config) } + fn make_signed_quote( + xorname: XorName, + price: Amount, + rewards_seed: u8, + ) -> (evmlib::EncodedPeerId, PaymentQuote) { + let ml_dsa = MlDsa65::new(); + let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let peer_id = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = PaymentQuote { + content: xor_name::XorName(xorname), + timestamp: SystemTime::now(), + price, + rewards_address: RewardsAddress::new([rewards_seed; 20]), + pub_key: pub_key_bytes, + signature: Vec::new(), + }; + let secret_key = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("secret key"); + quote.signature = ml_dsa + .sign(&secret_key, "e.bytes_for_sig()) + .expect("sign quote") + .as_bytes() + .to_vec(); + (peer_id, quote) + } + + fn make_signed_legacy_bundle( + xorname: XorName, + prices: [Amount; CLOSE_GROUP_SIZE], + ) -> Vec<(evmlib::EncodedPeerId, PaymentQuote)> { + prices + .into_iter() + .enumerate() + .map(|(index, price)| { + let rewards_seed = u8::try_from(index + 1).expect("small test index"); + make_signed_quote(xorname, price, rewards_seed) + }) + .collect() + } + + fn price_at_records(records: usize) -> Amount { + crate::payment::pricing::calculate_price(records) + } + + fn unique_test_prices() -> [Amount; CLOSE_GROUP_SIZE] { + [ + price_at_records(0), + price_at_records(1), + price_at_records(2), + price_at_records(3), + price_at_records(4), + price_at_records(5), + price_at_records(6), + ] + } + + fn tied_median_test_prices() -> [Amount; CLOSE_GROUP_SIZE] { + [ + price_at_records(0), + price_at_records(1), + price_at_records(2), + price_at_records(3), + price_at_records(3), + price_at_records(4), + price_at_records(5), + ] + } + + fn median_test_candidates( + peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)], + ) -> Vec<(evmlib::EncodedPeerId, PaymentQuote)> { + let mut sorted_quotes: Vec<_> = peer_quotes.iter().collect(); + sorted_quotes.sort_by_key(|(_, quote)| quote.price); + let median_index = median_quote_index(sorted_quotes.len()); + let median_price = sorted_quotes + .get(median_index) + .expect("median quote") + .1 + .price; + + sorted_quotes + .into_iter() + .filter(|(_, quote)| quote.price == median_price) + .map(|(peer_id, quote)| (peer_id.clone(), quote.clone())) + .collect() + } + + fn expected_median_payment(peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)]) -> Amount { + let median_price = median_test_candidates(peer_quotes) + .first() + .expect("median candidate") + .1 + .price; + median_price * Amount::from(PAID_QUOTE_PAYMENT_MULTIPLIER) + } + + fn mark_close_group_paid_candidates( + verifier: &PaymentVerifier, + peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)], + ) { + let close_group_peers = median_test_candidates(peer_quotes) + .iter() + .map(|(peer_id, _)| *peer_id.as_bytes()) + .collect(); + verifier.set_paid_quote_close_group_for_tests(close_group_peers); + } + + fn mark_candidate_paid(verifier: &PaymentVerifier, quote: &PaymentQuote, amount: Amount) { + verifier.set_completed_payment_for_tests(quote.hash(), amount); + } + + fn mark_all_median_candidates_unpaid( + verifier: &PaymentVerifier, + peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)], + ) { + for (_, quote) in median_test_candidates(peer_quotes) { + mark_candidate_paid(verifier, "e, Amount::ZERO); + } + } + #[test] fn test_payment_required_for_new_data() { let verifier = create_test_verifier(); @@ -1693,6 +1869,33 @@ mod tests { assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified); } + #[tokio::test] + async fn test_paid_list_cache_entry_does_not_satisfy_client_put() { + let verifier = create_test_verifier(); + let xorname = [0xB8u8; 32]; + verifier.cache.insert_paid_list_verified(xorname); + + assert_eq!( + verifier.check_payment_required(&xorname, VerificationContext::PaidListAdmission), + PaymentStatus::CachedAsVerified, + "paid-list lookups must hit a paid-list-verified entry" + ); + assert_eq!( + verifier.check_payment_required(&xorname, VerificationContext::ClientPut), + PaymentStatus::PaymentRequired, + "client PUT must not fast-path on a paid-list-verified entry" + ); + + let err = verifier + .verify_payment(&xorname, None, VerificationContext::ClientPut) + .await + .expect_err("proof-less client PUT must not ride the paid-list entry"); + assert!( + format!("{err}").contains("Payment required"), + "client PUT must still demand payment: {err}" + ); + } + #[test] fn test_payment_status_can_store() { assert!(PaymentStatus::CachedAsVerified.can_store()); @@ -1827,17 +2030,508 @@ mod tests { ); } - #[test] - fn test_cache_len_getter() { + #[tokio::test] + async fn test_legacy_paid_median_full_path_accepted() { let verifier = create_test_verifier(); - assert_eq!(verifier.cache_len(), 0); + verifier.set_records_stored_for_tests(0); + let xorname = [0xA1u8; 32]; + let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); - verifier.cache.insert([10u8; 32]); - assert_eq!(verifier.cache_len(), 1); + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; - verifier.cache.insert([20u8; 32]); - assert_eq!(verifier.cache_len(), 2); - } + assert_eq!( + result.expect("paid median should verify"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_single_quote_proof_accepted() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xB1u8; 32]; + let (peer_id, quote) = make_signed_quote(xorname, price_at_records(0), 1); + let peer_quotes = vec![(peer_id, quote.clone())]; + mark_close_group_paid_candidates(&verifier, &peer_quotes); + mark_candidate_paid(&verifier, "e, expected_median_payment(&peer_quotes)); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("single paid quote should verify"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_single_quote_proof_requires_three_x_payment() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xB2u8; 32]; + let (peer_id, quote) = make_signed_quote(xorname, price_at_records(0), 1); + let peer_quotes = vec![(peer_id, quote.clone())]; + mark_close_group_paid_candidates(&verifier, &peer_quotes); + mark_candidate_paid(&verifier, "e, quote.price); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("single quote paid less than 3x should be rejected"); + + assert!( + format!("{err}").contains("not paid enough"), + "Error should mention underpayment: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_too_many_quotes_rejected() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xB3u8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + peer_quotes.push(make_signed_quote(xorname, price_at_records(7), 8)); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("proof with more than close-group quotes should be rejected"); + + assert!( + format!("{err}").contains("at most"), + "Error should mention max quote count: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_structural_majority_price_at_median_accepted() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(1000); + let xorname = [0xA2u8; 32]; + let peer_quotes = make_signed_legacy_bundle( + xorname, + [ + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(100), + crate::payment::pricing::calculate_price(500), + crate::payment::pricing::calculate_price(1000), + crate::payment::pricing::calculate_price(2000), + crate::payment::pricing::calculate_price(4000), + crate::payment::pricing::calculate_price(6000), + ], + ); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("median-priced verifier should accept"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_above_median_verifier_rejected_by_floor() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(2000); + let xorname = [0xA3u8; 32]; + let peer_quotes = make_signed_legacy_bundle( + xorname, + [ + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(100), + crate::payment::pricing::calculate_price(500), + crate::payment::pricing::calculate_price(1000), + crate::payment::pricing::calculate_price(2000), + crate::payment::pricing::calculate_price(4000), + crate::payment::pricing::calculate_price(6000), + ], + ); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("above-median verifier should reject the client PUT"); + + assert!( + format!("{err}").contains("below local floor"), + "Error should mention paid-quote floor: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_paid_median_issuer_close_group_rejection() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + verifier.set_paid_quote_close_group_for_tests(vec![rand::random()]); + let xorname = [0xA4u8; 32]; + let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("out-of-close-group paid issuer should be rejected"); + + assert!( + format!("{err}").contains("not among this node's local"), + "Error should mention local close-group peers: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_paid_median_floor_rejection() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(6000); + let xorname = [0xA5u8; 32]; + let peer_quotes = make_signed_legacy_bundle( + xorname, + [ + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + ], + ); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("cheap paid median should be rejected"); + + assert!( + format!("{err}").contains("below local floor"), + "Error should mention local floor: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_zero_price_median_rejected() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xA6u8; 32]; + let peer_quotes = make_signed_legacy_bundle( + xorname, + [ + Amount::ZERO, + Amount::ZERO, + Amount::ZERO, + Amount::ZERO, + Amount::from(1u64), + Amount::from(2u64), + Amount::from(3u64), + ], + ); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("zero median must be rejected"); + + assert!( + format!("{err}").contains("zero price"), + "Error should mention zero price: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_paid_quote_content_mismatch_rejected() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xA7u8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + let median_index = median_quote_index(peer_quotes.len()); + peer_quotes[median_index].1.content = xor_name::XorName([0xE7u8; 32]); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("paid quote content mismatch should be rejected"); + + assert!( + format!("{err}").contains("content address mismatch"), + "Error should mention content mismatch: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_unpaid_quote_content_mismatch_accepted() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xA8u8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + peer_quotes[0].1.content = xor_name::XorName([0xE8u8; 32]); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("unpaid content mismatch should be ignored"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_paid_quote_bad_signature_rejected() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xA9u8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + let median_index = median_quote_index(peer_quotes.len()); + peer_quotes[median_index].1.signature.push(0xFF); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await + .expect_err("paid bad signature should be rejected"); + + assert!( + format!("{err}").contains("signature verification failed"), + "Error should mention signature failure: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_unpaid_quote_bad_signature_accepted() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xAAu8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + peer_quotes[0].1.signature.push(0xFF); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("unpaid bad signature should be ignored"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_unpaid_peer_binding_mismatch_accepted() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xABu8; 32]; + let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + peer_quotes[0].0 = evmlib::EncodedPeerId::new(rand::random()); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("unpaid peer binding mismatch should be ignored"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_median_tie_accepts_paid_candidate() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + let xorname = [0xACu8; 32]; + let peer_quotes = make_signed_legacy_bundle(xorname, tied_median_test_prices()); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + mark_all_median_candidates_unpaid(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .get(1) + .expect("second tied median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) + .await; + + assert_eq!( + result.expect("one paid tied median candidate should verify"), + PaymentStatus::PaymentVerified + ); + } + + #[tokio::test] + async fn test_legacy_paid_list_admission_enforces_issuer_close_group() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(0); + verifier.set_paid_quote_close_group_for_tests(Vec::new()); + let xorname = [0xB5u8; 32]; + let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices()); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment( + &xorname, + Some(&proof_bytes), + VerificationContext::PaidListAdmission, + ) + .await + .expect_err("paid-list admission must enforce the paid issuer close-group check"); + + assert!( + format!("{err}").contains("not among this node's local"), + "Error should mention local close-group peers: {err}" + ); + } + + #[tokio::test] + async fn test_legacy_paid_list_admission_enforces_full_bundle_floor() { + let verifier = create_test_verifier(); + verifier.set_records_stored_for_tests(6000); + let xorname = [0xB6u8; 32]; + let peer_quotes = make_signed_legacy_bundle( + xorname, + [ + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + crate::payment::pricing::calculate_price(0), + ], + ); + mark_close_group_paid_candidates(&verifier, &peer_quotes); + let expected_amount = expected_median_payment(&peer_quotes); + let paid_quote = median_test_candidates(&peer_quotes) + .first() + .expect("median candidate") + .1 + .clone(); + mark_candidate_paid(&verifier, &paid_quote, expected_amount); + + let proof_bytes = serialize_proof(peer_quotes); + let err = verifier + .verify_payment( + &xorname, + Some(&proof_bytes), + VerificationContext::PaidListAdmission, + ) + .await + .expect_err("paid-list admission must enforce the floor for full bundles"); + + assert!( + format!("{err}").contains("below local floor"), + "Error should mention the local price floor: {err}" + ); + } + + #[test] + fn test_cache_len_getter() { + let verifier = create_test_verifier(); + assert_eq!(verifier.cache_len(), 0); + + verifier.cache.insert([10u8; 32]); + assert_eq!(verifier.cache_len(), 1); + + verifier.cache.insert([20u8; 32]); + assert_eq!(verifier.cache_len(), 2); + } #[test] fn test_cache_stats_after_operations() { @@ -2020,195 +2714,6 @@ mod tests { } } - /// Helper: create a fake quote whose price encodes the supplied record count. - fn make_fake_quote_at_records( - xorname: [u8; 32], - timestamp: SystemTime, - rewards_address: RewardsAddress, - records: usize, - ) -> evmlib::PaymentQuote { - let mut quote = make_fake_quote(xorname, timestamp, rewards_address); - quote.price = crate::payment::pricing::calculate_price(records); - quote - } - - /// A small upward record drift between quoting and verifying — the normal - /// in-flight churn on a busy network — must pass. The old fixed 5-record - /// tolerance rejected a drift of 10 as "stale by 10 records"; the - /// price-based gate sees a negligible price move on the near-flat curve and - /// accepts it. - #[test] - fn test_small_record_drift_accepted() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - // Node gained 10 records since quoting (100 -> 110). - verifier.set_records_stored_for_tests(110); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - let quote = make_fake_quote_at_records( - [0xE0u8; 32], - SystemTime::now(), - RewardsAddress::new([1u8; 20]), - 100, - ); - let payment = ProofOfPayment { - peer_quotes: vec![(EncodedPeerId::new(self_id), quote)], - }; - - verifier - .validate_quote_freshness(&payment) - .expect("benign in-flight drift should pass"); - } - - /// Over-payment must always be accepted: the node had MORE records when it - /// quoted than it does now (e.g. it pruned), so the client paid for a - /// fuller, pricier node. The old symmetric `abs_diff` gate wrongly rejected - /// this; ~36% of STG-01 rejections were exactly this case. - #[test] - fn test_overpayment_accepted() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - // Quote priced at 6000 records, but node now holds only 100. - verifier.set_records_stored_for_tests(100); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - let quote = make_fake_quote_at_records( - [0xE2u8; 32], - SystemTime::now(), - RewardsAddress::new([1u8; 20]), - 6000, - ); - let payment = ProofOfPayment { - peer_quotes: vec![(EncodedPeerId::new(self_id), quote)], - }; - - verifier - .validate_quote_freshness(&payment) - .expect("over-payment must never be rejected"); - } - - /// Genuine staleness — a quote that under-prices the node's current fullness - /// by far more than the tolerance — is still rejected. Quote encodes 100 - /// records but the node now holds 6000, so the quadratic curve makes the - /// paid price a small fraction of the current price. - #[test] - fn test_underpriced_quote_rejected() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - verifier.set_records_stored_for_tests(6000); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - let quote = make_fake_quote_at_records( - [0xE1u8; 32], - SystemTime::now(), - RewardsAddress::new([1u8; 20]), - 100, - ); - let payment = ProofOfPayment { - peer_quotes: vec![(EncodedPeerId::new(self_id), quote)], - }; - - let err = verifier - .validate_quote_freshness(&payment) - .expect_err("a quote underpricing by >25% should fail"); - assert!(format!("{err}").contains("stale")); - } - - /// Regression test for the PROD-UL-01 `DataMap` failure (2026-06-04): a - /// close group whose fullness spans 47..=1788 records produces a bundle - /// where the emptiest node's honest quote prices far below a full node's - /// 75% floor. The verifying node must gate only its OWN quote — a - /// neighbour's cheap-but-honest quote is not evidence of staleness. - #[test] - fn test_neighbour_cheap_quote_not_rejected() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - // This node holds 1788 records (the fullest rejector in the incident). - verifier.set_records_stored_for_tests(1788); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - - let xorname = [0xE3u8; 32]; - let rewards = RewardsAddress::new([1u8; 20]); - // Own quote is fresh: priced at our own current fullness. - let own_quote = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 1788); - // Neighbour quotes from a heterogeneous close group, including a - // nearly-empty node at 47 records (price far below our 75% floor). - let neighbour_47 = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 47); - let neighbour_978 = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 978); - - let payment = ProofOfPayment { - peer_quotes: vec![ - (EncodedPeerId::new(rand::random()), neighbour_47), - (EncodedPeerId::new(self_id), own_quote), - (EncodedPeerId::new(rand::random()), neighbour_978), - ], - }; - - verifier - .validate_quote_freshness(&payment) - .expect("neighbours' cheaper quotes must not trip this node's own staleness gate"); - } - - /// The own-quote gate still bites: if THIS node's own quote in the bundle - /// underprices its current fullness beyond tolerance, the payment is - /// rejected even when every neighbour quote looks expensive. - #[test] - fn test_own_stale_quote_still_rejected_among_neighbours() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - verifier.set_records_stored_for_tests(6000); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - - let xorname = [0xE4u8; 32]; - let rewards = RewardsAddress::new([1u8; 20]); - let own_stale = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 100); - let neighbour = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 7000); - - let payment = ProofOfPayment { - peer_quotes: vec![ - (EncodedPeerId::new(rand::random()), neighbour), - (EncodedPeerId::new(self_id), own_stale), - ], - }; - - let err = verifier - .validate_quote_freshness(&payment) - .expect_err("own underpriced quote must still be rejected"); - assert!(format!("{err}").contains("stale")); - } - - /// Without a self peer-id source (no `P2PNode` attached, no test override) - /// the gate skips rather than rejecting — mirroring the missing - /// record-count-source behaviour. - #[test] - fn test_freshness_skipped_without_self_peer_id() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - verifier.set_records_stored_for_tests(6000); - // NOTE: no set_peer_id_for_tests call. - let quote = make_fake_quote_at_records( - [0xE5u8; 32], - SystemTime::now(), - RewardsAddress::new([1u8; 20]), - 100, - ); - let payment = ProofOfPayment { - peer_quotes: vec![(EncodedPeerId::new(rand::random()), quote)], - }; - - verifier - .validate_quote_freshness(&payment) - .expect("gate must fail open when self identity is unknown"); - } - /// Helper: wrap quotes into a tagged serialized `PaymentProof`. fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec { use crate::payment::proof::{serialize_single_node_proof, PaymentProof}; @@ -2378,54 +2883,6 @@ mod tests { evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes()) } - #[tokio::test] - async fn test_local_not_in_paid_set_rejected() { - use evmlib::RewardsAddress; - use saorsa_core::MlDsa65; - use saorsa_pqc::pqc::MlDsaOperations; - - // Verifier with a local rewards address set - let local_addr = RewardsAddress::new([0xAAu8; 20]); - let config = PaymentVerifierConfig { - evm: EvmVerifierConfig { - network: EvmNetwork::ArbitrumOne, - }, - cache_capacity: 100, - local_rewards_address: local_addr, - }; - let verifier = PaymentVerifier::new(config); - - let xorname = [0xEEu8; 32]; - // Quotes pay a DIFFERENT rewards address - let other_addr = RewardsAddress::new([0xBBu8; 20]); - - // Use real ML-DSA keys so the pub_key→peer_id binding check passes - let ml_dsa = MlDsa65::new(); - let mut peer_quotes = Vec::new(); - for _ in 0..CLOSE_GROUP_SIZE { - let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen"); - let pub_key_bytes = public_key.as_bytes().to_vec(); - let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); - - let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr); - quote.pub_key = pub_key_bytes; - - peer_quotes.push((encoded, quote)); - } - - let proof_bytes = serialize_proof(peer_quotes); - let result = verifier - .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) - .await; - - assert!(result.is_err(), "Should reject payment not addressed to us"); - let err_msg = format!("{}", result.expect_err("should fail")); - assert!( - err_msg.contains("does not include this node as a recipient"), - "Error should mention recipient rejection: {err_msg}" - ); - } - #[tokio::test] async fn test_wrong_peer_binding_rejected() { use evmlib::{EncodedPeerId, RewardsAddress}; @@ -2466,200 +2923,16 @@ mod tests { } // ========================================================================= - // VerificationContext tests — Replication must skip the - // storer-being-paid-now checks (own-quote freshness, local recipient, - // merkle candidate closeness) while keeping every receipt-authenticity - // check. Each test runs the same proof under both contexts and asserts - // the context-gated check fires only under ClientPut. Where a proof - // can't reach Ok(()) without on-chain access, "skipped" is proven by the - // error moving PAST the gated check to a later stage. + // VerificationContext tests — both contexts verify fresh proof admissions. + // Later neighbour-sync repair has no proof-of-payment and is authorized by + // closest-7 storage quorum or closest-K paid-list quorum instead. // ========================================================================= - /// A bundle whose own quote is stale (quoted 100 records, node now holds - /// 6000) is rejected by the freshness gate under `ClientPut`, but under - /// `Replication` the gate is skipped: verification proceeds to the next - /// stage (peer bindings, which fail on the fake `pub_keys`). + /// Content binding is required for every fresh proof context. A receipt for + /// chunk A cannot admit chunk B as either a direct/fresh store or a fresh + /// paid-list update. #[tokio::test] - async fn test_replication_context_skips_own_quote_freshness() { - use evmlib::{EncodedPeerId, RewardsAddress}; - - let verifier = create_test_verifier(); - verifier.set_records_stored_for_tests(6000); - let self_id: [u8; 32] = rand::random(); - verifier.set_peer_id_for_tests(self_id); - - let xorname = [0xD0u8; 32]; - let rewards = RewardsAddress::new([1u8; 20]); - let own_stale = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 100); - let mut peer_quotes = vec![(EncodedPeerId::new(self_id), own_stale)]; - for _ in 1..CLOSE_GROUP_SIZE { - let neighbour = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 6000); - peer_quotes.push((EncodedPeerId::new(rand::random()), neighbour)); - } - let proof_bytes = serialize_proof(peer_quotes); - - let err = verifier - .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) - .await - .expect_err("own stale quote must be rejected on a client PUT"); - assert!( - format!("{err}").contains("stale"), - "ClientPut must fail at the freshness gate: {err}" - ); - - let err = verifier - .verify_payment( - &xorname, - Some(&proof_bytes), - VerificationContext::Replication, - ) - .await - .expect_err("fake pub_keys still fail peer bindings"); - let msg = format!("{err}"); - assert!( - !msg.contains("stale"), - "Replication must skip the freshness gate: {msg}" - ); - assert!( - msg.contains("Invalid ML-DSA public key"), - "Replication should fail at the LATER peer-binding stage: {msg}" - ); - } - - /// A receipt that pays a different node's rewards address is rejected by - /// the local-recipient check under `ClientPut`, but under `Replication` - /// (a post-churn close-group member was never a payee) the check is - /// skipped: verification proceeds to quote-signature verification. - #[tokio::test] - async fn test_replication_context_skips_local_recipient() { - use evmlib::RewardsAddress; - use saorsa_core::MlDsa65; - use saorsa_pqc::pqc::MlDsaOperations; - - let local_addr = RewardsAddress::new([0xAAu8; 20]); - let config = PaymentVerifierConfig { - evm: EvmVerifierConfig { - network: EvmNetwork::ArbitrumOne, - }, - cache_capacity: 100, - local_rewards_address: local_addr, - }; - let verifier = PaymentVerifier::new(config); - - let xorname = [0xD1u8; 32]; - // Quotes pay a DIFFERENT rewards address. - let other_addr = RewardsAddress::new([0xBBu8; 20]); - - // Real ML-DSA keys so the pub_key→peer_id binding check passes and - // the first divergence between contexts is the recipient check. - let ml_dsa = MlDsa65::new(); - let mut peer_quotes = Vec::new(); - for _ in 0..CLOSE_GROUP_SIZE { - let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen"); - let pub_key_bytes = public_key.as_bytes().to_vec(); - let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); - let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr); - quote.pub_key = pub_key_bytes; - peer_quotes.push((encoded, quote)); - } - let proof_bytes = serialize_proof(peer_quotes); - - let err = verifier - .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut) - .await - .expect_err("payment not addressed to us must fail on a client PUT"); - assert!( - format!("{err}").contains("does not include this node as a recipient"), - "ClientPut must fail at the recipient check: {err}" - ); - - let err = verifier - .verify_payment( - &xorname, - Some(&proof_bytes), - VerificationContext::Replication, - ) - .await - .expect_err("fake quote signatures still fail signature verification"); - let msg = format!("{err}"); - assert!( - !msg.contains("recipient"), - "Replication must skip the recipient check: {msg}" - ); - assert!( - msg.contains("signature verification failed"), - "Replication should fail at the LATER signature stage: {msg}" - ); - } - - /// A `Replication`-verified cache entry must not satisfy a later - /// `ClientPut` fast-path: the context-gated checks were never run for it, - /// so letting it short-circuit a client PUT would bypass them via the - /// cache. It must still satisfy later `Replication` lookups (re-offers of - /// the same key are routine), and a subsequent full `ClientPut` - /// verification upgrades the entry without ever being downgraded back. - #[tokio::test] - async fn test_replication_verified_cache_entry_does_not_satisfy_client_put() { - let verifier = create_test_verifier(); - let xorname = [0xD4u8; 32]; - - // Simulate a successful Replication-context verification. - verifier.cache.insert_replication_verified(xorname); - - assert_eq!( - verifier.check_payment_required(&xorname, VerificationContext::Replication), - PaymentStatus::CachedAsVerified, - "replication lookups must hit a replication-verified entry" - ); - assert_eq!( - verifier.check_payment_required(&xorname, VerificationContext::ClientPut), - PaymentStatus::PaymentRequired, - "a client PUT must not fast-path on a replication-verified entry" - ); - - // End-to-end: a proof-less client PUT is still rejected, while a - // proof-less replication re-check passes via the cache. - let result = verifier - .verify_payment(&xorname, None, VerificationContext::Replication) - .await; - assert_eq!( - result.expect("replication re-check should hit the cache"), - PaymentStatus::CachedAsVerified - ); - let err = verifier - .verify_payment(&xorname, None, VerificationContext::ClientPut) - .await - .expect_err("proof-less client PUT must not ride the replication entry"); - assert!( - format!("{err}").contains("Payment required"), - "client PUT must still demand payment: {err}" - ); - - // A full ClientPut verification upgrades the entry... - verifier.cache.insert(xorname); - assert_eq!( - verifier.check_payment_required(&xorname, VerificationContext::ClientPut), - PaymentStatus::CachedAsVerified, - "a full client-PUT verification must upgrade the entry" - ); - - // ...and a later replication re-verification never downgrades it. - verifier.cache.insert_replication_verified(xorname); - assert_eq!( - verifier.check_payment_required(&xorname, VerificationContext::ClientPut), - PaymentStatus::CachedAsVerified, - "replication re-verification must not downgrade a client-PUT entry" - ); - } - - /// Receipt authenticity is NOT relaxed under `Replication`: a bundle whose - /// quotes are bound to a different content address is rejected in both - /// contexts. A neighbour cannot replay a receipt for chunk A to get - /// chunk B admitted. - #[tokio::test] - async fn test_replication_context_still_rejects_content_mismatch() { - use evmlib::{EncodedPeerId, RewardsAddress}; - + async fn test_fresh_contexts_reject_content_mismatch() { let verifier = create_test_verifier(); let stored_xorname = [0xD2u8; 32]; let quoted_xorname = [0xD3u8; 32]; @@ -2668,13 +2941,13 @@ mod tests { let mut peer_quotes = Vec::new(); for _ in 0..CLOSE_GROUP_SIZE { let quote = make_fake_quote(quoted_xorname, SystemTime::now(), rewards); - peer_quotes.push((EncodedPeerId::new(rand::random()), quote)); + peer_quotes.push((evmlib::EncodedPeerId::new(rand::random()), quote)); } let proof_bytes = serialize_proof(peer_quotes); for context in [ VerificationContext::ClientPut, - VerificationContext::Replication, + VerificationContext::PaidListAdmission, ] { let err = verifier .verify_payment(&stored_xorname, Some(&proof_bytes), context) @@ -2688,15 +2961,13 @@ mod tests { } /// The merkle pay-yourself closeness defence (including its duplicate- - /// candidate pre-check, which runs without a `P2PNode`) applies to client - /// PUTs only. Under `Replication` the pool was sampled from the DHT of - /// the original sale, so the live-DHT check is skipped and verification - /// proceeds to the on-chain stages. + /// candidate pre-check, which runs without a `P2PNode`) applies to every + /// proof verification context because every context is a fresh admission. #[tokio::test] - async fn test_replication_context_skips_merkle_closeness() { + async fn test_fresh_contexts_enforce_merkle_closeness() { let verifier = create_test_verifier(); - let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof(); + let (mut merkle_proof, _pool_hash, xorname, _timestamp) = make_valid_merkle_proof(); // 16 copies of one real candidate: every self-signature is valid, but // the candidate PeerIds are duplicates — the closeness pre-check @@ -2710,45 +2981,22 @@ mod tests { for c in &mut merkle_proof.winner_pool.candidate_nodes { *c = shared.clone(); } - let pool_hash = merkle_proof.winner_pool_hash(); - - // Seed the pool cache with a deliberately mismatched timestamp so the - // Replication path fails deterministically AFTER the (skipped) - // closeness check, without needing on-chain access. - { - let info = evmlib::merkle_payments::OnChainPaymentInfo { - depth: 4, - merkle_payment_timestamp: timestamp + 1, - paid_node_addresses: vec![], - }; - verifier.pool_cache.lock().put(pool_hash, info); - } - let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize"); - let err = verifier - .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut) - .await - .expect_err("duplicate candidate PeerIds must fail the client-PUT closeness check"); - assert!( - format!("{err}").contains("duplicate candidate PeerId"), - "ClientPut must fail at the closeness pre-check: {err}" - ); - - let err = verifier - .verify_payment(&xorname, Some(&tagged), VerificationContext::Replication) - .await - .expect_err("seeded timestamp mismatch still fails after the skipped check"); - let msg = format!("{err}"); - assert!( - !msg.contains("duplicate candidate PeerId"), - "Replication must skip the closeness check: {msg}" - ); - assert!( - msg.contains("timestamp mismatch"), - "Replication should fail at the LATER timestamp stage: {msg}" - ); + for context in [ + VerificationContext::ClientPut, + VerificationContext::PaidListAdmission, + ] { + let err = verifier + .verify_payment(&xorname, Some(&tagged), context) + .await + .expect_err("duplicate candidate PeerIds must fail fresh admission closeness"); + assert!( + format!("{err}").contains("duplicate candidate PeerId"), + "{context:?} must fail at the closeness pre-check: {err}" + ); + } } // ========================================================================= @@ -3246,6 +3494,7 @@ mod tests { let config = PaymentVerifierConfig { evm: EvmVerifierConfig::default(), cache_capacity: 100, + close_group_size: CLOSE_GROUP_SIZE, local_rewards_address: RewardsAddress::new([1u8; 20]), }; let verifier = PaymentVerifier::new(config); diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 0e0995c7..c2b89f7f 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -72,6 +72,14 @@ use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent}; /// Prefix used by saorsa-core's request-response mechanism. const RR_PREFIX: &str = "/rr/"; +fn fresh_offer_payment_context() -> VerificationContext { + VerificationContext::ClientPut +} + +fn paid_notify_payment_context() -> VerificationContext { + VerificationContext::PaidListAdmission +} + /// Boxed future type for in-flight fetch tasks. type FetchFuture = Pin)> + Send>>; @@ -1135,6 +1143,39 @@ async fn handle_fresh_offer( return Ok(()); } + // Mirror the normal PUT path: the advertised key must be the content + // address of the supplied bytes before any expensive payment verification. + let computed_key = crate::client::compute_address(&offer.data); + if computed_key != offer.key { + warn!( + "Rejecting fresh offer for key {}: content address mismatch, computed {}", + hex::encode(offer.key), + hex::encode(computed_key), + ); + p2p_node + .report_trust_event( + source, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + send_replication_response( + source, + p2p_node, + request_id, + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key: offer.key, + reason: format!( + "Content address mismatch: expected {}, computed {}", + hex::encode(offer.key), + hex::encode(computed_key), + ), + }), + rr_message_id, + ) + .await; + return Ok(()); + } + // Rule 7: check responsibility. if !admission::is_responsible(&self_id, &offer.key, p2p_node, config.close_group_size).await { send_replication_response( @@ -1177,17 +1218,18 @@ async fn handle_fresh_offer( return Ok(()); } - // Gap 1: Validate PoP via PaymentVerifier. This is an already-settled - // receipt handed over by a neighbour, not a live sale — Replication - // context skips the storer-being-paid-now checks (own-quote price - // freshness, local recipient, merkle candidate closeness) that would - // otherwise reject every honest hand-over once counts grow, the close - // group churns, or the live DHT drifts from the pool's original sample. + // Gap 1: Validate PoP via PaymentVerifier. Fresh replication is still + // part of the immediate write fan-out: this receiver is about to store the + // record as if the client had PUT it here directly. Receiver responsibility + // was checked above before proof work. ClientPut verification applies + // store-strength cache semantics, paid-quote issuer close-group and local + // price floor checks for single-node proofs, and merkle candidate + // closeness for merkle proofs. match payment_verifier .verify_payment( &offer.key, Some(&offer.proof_of_payment), - VerificationContext::Replication, + fresh_offer_payment_context(), ) .await { @@ -1301,13 +1343,15 @@ async fn handle_paid_notify( return Ok(()); } - // Gap 1: Validate PoP via PaymentVerifier. Same as the fresh-offer path: - // a settled receipt, so Replication context (see VerificationContext). + // Gap 1: Validate PoP via PaymentVerifier. PaidNotify admits fresh + // paid-list metadata, so local paid-list close-group membership was checked + // above before proof work. The verifier then runs the same payment proof + // checks as ClientPut while writing a paid-list-strength cache entry. match payment_verifier .verify_payment( ¬ify.key, Some(¬ify.proof_of_payment), - VerificationContext::Replication, + paid_notify_payment_context(), ) .await { @@ -2695,9 +2739,29 @@ fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { - use super::{audit_failure_clears_bootstrap_claim, first_failed_key_label}; + use super::{ + audit_failure_clears_bootstrap_claim, first_failed_key_label, fresh_offer_payment_context, + paid_notify_payment_context, + }; + use crate::payment::VerificationContext; use crate::replication::types::AuditFailureReason; + #[test] + fn fresh_offer_runs_client_put_payment_checks() { + assert_eq!( + fresh_offer_payment_context(), + VerificationContext::ClientPut + ); + } + + #[test] + fn paid_notify_uses_paid_list_admission_payment_checks() { + assert_eq!( + paid_notify_payment_context(), + VerificationContext::PaidListAdmission + ); + } + #[test] fn audit_timeout_preserves_active_bootstrap_claim() { assert!(!audit_failure_clears_bootstrap_claim( diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 26d20d77..d5421d09 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -41,6 +41,8 @@ use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use parking_lot::RwLock; +use saorsa_core::P2PNode; use std::sync::Arc; use tokio::sync::mpsc; @@ -56,8 +58,13 @@ pub struct AntProtocol { /// Quote generator for creating storage quotes. /// Also handles merkle candidate quote signing via ML-DSA-65. quote_generator: Arc, + /// P2P node handle used for direct PUT receiver responsibility checks. + p2p_node: RwLock>>, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// Test-only override for direct PUT storage responsibility checks. + #[cfg(any(test, feature = "test-utils"))] + test_store_membership_override: RwLock>, } impl AntProtocol { @@ -74,14 +81,13 @@ impl AntProtocol { payment_verifier: Arc, quote_generator: Arc, ) -> Self { - // Keep the PaymentVerifier's freshness gate AND the QuoteGenerator's - // pricing wired to the same authoritative store used by this protocol - // handler. Pricing and the freshness gate MUST read the same record - // count: the generator prices a quote from current_chunks() and the - // verifier later checks the quote against current_chunks(), so the only - // difference they see is genuine in-flight growth. Attaching both here - // makes the invariant automatic for every AntProtocol construction - // path, including tests and future startup variants. + // Keep the PaymentVerifier's paid-quote price floor and the + // QuoteGenerator's pricing wired to the same authoritative store used + // by this protocol handler. Both must read the same record count: the + // generator prices quotes from current_chunks(), and the verifier later + // checks the paid median quote against current_chunks(). Attaching both + // here makes the invariant automatic for every AntProtocol + // construction path, including tests and future startup variants. payment_verifier.attach_storage(Arc::clone(&storage)); quote_generator.attach_storage(Arc::clone(&storage)); @@ -89,10 +95,31 @@ impl AntProtocol { storage, payment_verifier, quote_generator, + p2p_node: RwLock::new(None), fresh_write_tx: None, + #[cfg(any(test, feature = "test-utils"))] + test_store_membership_override: RwLock::new(None), } } + /// Attach the node's P2P handle for direct PUT responsibility and payment + /// live-DHT checks. + /// + /// Also wires the same handle into the payment verifier so payment-proof + /// closeness checks can use the live routing view. Idempotent: calling + /// twice replaces both handles. + pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(Arc::clone(&node)); + self.payment_verifier.attach_p2p_node(node); + debug!("AntProtocol: P2PNode attached for payment live-DHT checks"); + } + + /// Test-only setter for direct PUT storage responsibility checks. + #[cfg(any(test, feature = "test-utils"))] + pub fn set_store_membership_for_tests(&self, is_member: bool) { + *self.test_store_membership_override.write() = Some(is_member); + } + /// Set the channel sender for fresh-write replication events. /// /// When set, successful chunk PUTs will notify the replication engine @@ -243,7 +270,14 @@ impl AntProtocol { Ok(false) => {} } - // 4. Cheap disk-space pre-check — runs BEFORE the expensive payment + // 4. Check storage responsibility before payment verification. A node + // should only accept the actual chunk when its local routing table + // places it in the configured close group for the chunk address. + if let Err(e) = self.validate_store_membership(&address).await { + return ChunkPutResponse::Error(e); + } + + // 5. Cheap disk-space pre-check — runs BEFORE the expensive payment // verification path (ML-DSA pool checks, a Kademlia closeness // lookup, and an on-chain Arbitrum RPC). A disk-full node can never // satisfy this PUT, so reject it here rather than burning that work @@ -262,9 +296,9 @@ impl AntProtocol { return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())); } - // 5. Verify payment. This node is the storer being paid right now, so - // the full ClientPut check set applies (own-quote price freshness, - // local recipient, merkle candidate closeness). + // 6. Verify payment. The ClientPut context applies the store-strength + // payment cache and verifies live proofs. Storage responsibility was + // checked above before any proof work. let payment_result = self .payment_verifier .verify_payment( @@ -288,19 +322,19 @@ impl AntProtocol { } } - // 6. Store chunk + // 7. Store chunk match self.storage.put(&address, &request.content).await { Ok(_) => { let content_len = request.content.len(); info!("Stored chunk {addr_hex} ({content_len} bytes)"); // Bump the in-memory fallback counter. Both pricing and the - // freshness gate now read LmdbStorage::current_chunks() directly, + // paid-quote floor now read LmdbStorage::current_chunks() directly, // so this counter only matters when no storage is attached // (unit tests / mis-configured startup). Kept warm so that // fallback path stays roughly accurate. self.quote_generator.record_store(); - // 7. Notify replication engine for fresh fan-out. + // 8. Notify replication engine for fresh fan-out. // Only emit when a real proof is present — cached-as-verified // PUTs have no proof to forward, and the chunk would have // already replicated on the original write that carried one. @@ -329,6 +363,68 @@ impl AntProtocol { } } + async fn validate_store_membership( + &self, + address: &[u8; 32], + ) -> std::result::Result<(), ProtocolError> { + #[cfg(any(test, feature = "test-utils"))] + { + let membership_override = *self.test_store_membership_override.read(); + if let Some(is_member) = membership_override { + if is_member { + return Ok(()); + } + return Err(ProtocolError::PaymentFailed(format!( + "ClientPut receiver is not among this node's local {} closest peers for {}", + self.payment_verifier.close_group_size(), + hex::encode(address) + ))); + } + } + + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + let Some(p2p_node) = attached else { + #[cfg(any(test, feature = "test-utils"))] + { + crate::logging::warn!( + "AntProtocol: no P2PNode attached; ClientPut storage \ + responsibility check SKIPPED (test build). Production startup \ + MUST call AntProtocol::attach_p2p_node." + ); + return Ok(()); + } + #[cfg(not(any(test, feature = "test-utils")))] + { + crate::logging::error!( + "AntProtocol: no P2PNode attached; rejecting ClientPut. \ + This is a node-startup bug — AntProtocol::attach_p2p_node \ + must be called before PUT handling runs." + ); + return Err(ProtocolError::PaymentFailed( + "ClientPut rejected: protocol handler is not wired to the \ + P2P layer; cannot verify storage responsibility." + .to_string(), + )); + } + }; + + let self_id = *p2p_node.peer_id(); + let close_group_size = self.payment_verifier.close_group_size(); + let closest = p2p_node + .dht_manager() + .find_closest_nodes_local_with_self(address, close_group_size) + .await; + if closest.iter().any(|node| node.peer_id == self_id) { + return Ok(()); + } + + Err(ProtocolError::PaymentFailed(format!( + "ClientPut receiver {} is not among this node's local {close_group_size} closest peers for {}", + self_id.to_hex(), + hex::encode(address) + ))) + } + /// Handle a GET request. /// /// Wraps `handle_get_inner` to emit a single structured tracing event per @@ -575,6 +671,7 @@ mod tests { let payment_config = PaymentVerifierConfig { evm: EvmVerifierConfig::default(), cache_capacity: 100_000, + close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE, local_rewards_address: rewards_address, }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); @@ -913,6 +1010,42 @@ mod tests { } } + #[tokio::test] + async fn test_put_rejects_out_of_range_receiver_before_payment_cache() { + const REQUEST_ID: u64 = 105; + + let (protocol, _temp) = create_test_protocol().await; + + let content = b"out of range receiver cache test"; + let address = LmdbStorage::compute_address(content); + protocol.payment_verifier().cache_insert(address); + protocol.set_store_membership_for_tests(false); + + let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content)); + let put_msg = ChunkMessage { + request_id: REQUEST_ID, + body: ChunkMessageBody::PutRequest(put_request), + }; + let put_bytes = put_msg.encode().expect("encode put"); + let response_bytes = protocol + .try_handle_request(&put_bytes) + .await + .expect("handle put") + .expect("expected response"); + let response = ChunkMessage::decode(&response_bytes).expect("decode"); + + assert_eq!(response.request_id, REQUEST_ID); + if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error( + ProtocolError::PaymentFailed(message), + )) = response.body + { + assert!(message.contains("not among this node's local")); + } else { + panic!("expected receiver responsibility rejection, got: {response:?}"); + } + assert!(!protocol.exists(&address).expect("exists check")); + } + #[tokio::test] async fn test_put_same_chunk_twice_hits_cache() { let (protocol, _temp) = create_test_protocol().await; @@ -935,7 +1068,7 @@ mod tests { .await .expect("handle put 1"); - // Second PUT — should return AlreadyExists (checked in storage before payment) + // Second PUT should return AlreadyExists from the storage idempotency check. let response_bytes = protocol .try_handle_request(&put_bytes) .await @@ -945,7 +1078,7 @@ mod tests { if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body { - // expected — storage check comes before payment check + // expected } else { panic!("expected AlreadyExists, got: {response:?}"); } diff --git a/tests/e2e/data_types/chunk.rs b/tests/e2e/data_types/chunk.rs index c208c1d7..d66a4278 100644 --- a/tests/e2e/data_types/chunk.rs +++ b/tests/e2e/data_types/chunk.rs @@ -68,6 +68,7 @@ mod tests { QuotingMetricsTracker, }; use ant_node::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig}; + use ant_node::ReplicationConfig; use evmlib::testnet::Testnet; use evmlib::RewardsAddress; use rand::seq::SliceRandom; @@ -442,6 +443,7 @@ mod tests { let payment_verifier = PaymentVerifier::new(PaymentVerifierConfig { evm: EvmVerifierConfig { network }, cache_capacity: 100, + close_group_size: ReplicationConfig::default().close_group_size, local_rewards_address: rewards_address, }); let metrics_tracker = QuotingMetricsTracker::new(100); diff --git a/tests/e2e/merkle_payment.rs b/tests/e2e/merkle_payment.rs index c6ceb37d..960fc988 100644 --- a/tests/e2e/merkle_payment.rs +++ b/tests/e2e/merkle_payment.rs @@ -23,6 +23,7 @@ use ant_node::compute_address; use ant_node::payment::{ serialize_merkle_proof, MAX_PAYMENT_PROOF_SIZE_BYTES, MIN_PAYMENT_PROOF_SIZE_BYTES, }; +use ant_node::CLOSE_GROUP_SIZE; use bytes::Bytes; use evmlib::common::Amount; use evmlib::merkle_payments::{ @@ -87,6 +88,35 @@ async fn send_put_to_node( ChunkMessage::decode(&response_bytes).map_err(|e| format!("Decode failed: {e}")) } +async fn responsible_receiver_index( + harness: &TestHarness, + address: &[u8; 32], +) -> Result> { + for node in harness.network().nodes() { + let Some(p2p_node) = node.p2p_node.as_ref() else { + continue; + }; + + let self_peer_id = *p2p_node.peer_id(); + let closest = p2p_node + .dht_manager() + .find_closest_nodes_local_with_self(address, CLOSE_GROUP_SIZE) + .await; + if closest + .iter() + .any(|closest_node| closest_node.peer_id == self_peer_id) + { + return Ok(node.index); + } + } + + Err(format!( + "no running node's local view included itself in the closest {CLOSE_GROUP_SIZE} peers for {}", + hex::encode(address) + ) + .into()) +} + /// Create a lightweight test harness with payment enforcement and Anvil wiring. async fn setup_enforcement_env() -> Result<(TestHarness, Testnet), Box> { let testnet = Testnet::new().await?; @@ -555,7 +585,14 @@ async fn test_attack_merkle_pay_yourself_fabricated_pool() -> Result<(), Box Result<(), Box Result<(), Box { + assert!( + reason.contains("Content address mismatch"), + "Should mention content address mismatch, got: {reason}" + ); + } + other => panic!("Expected Rejected, got: {other:?}"), + } + + let protocol_a = node_a.ant_protocol.as_ref().expect("protocol"); + assert!( + !protocol_a.storage().exists(&wrong_address).unwrap_or(false), + "Chunk should not be stored under the wrong address" + ); + assert!( + !protocol_a + .storage() + .exists(&actual_address) + .unwrap_or(false), + "Chunk should not be stored under the actual address after rejected offer" + ); + + harness.teardown().await.expect("teardown"); +} + /// Neighbor sync request returns a sync response (Section 18 #5/#37). /// /// Send a `NeighborSyncRequest` from one node to another and verify we diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 14216be0..90c73b5a 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1094,11 +1094,13 @@ impl TestNetwork { // When an EVM network is provided (e.g. Anvil), use it for on-chain verification. // Otherwise default to ArbitrumSepoliaTest for test nodes. let rewards_address = RewardsAddress::new(TEST_REWARDS_ADDRESS); + let replication_config = ReplicationConfig::default(); let payment_config = PaymentVerifierConfig { evm: EvmVerifierConfig { network: evm_network.unwrap_or(EvmNetwork::ArbitrumSepoliaTest), }, cache_capacity: TEST_PAYMENT_CACHE_CAPACITY, + close_group_size: replication_config.close_group_size, local_rewards_address: rewards_address, }; let payment_verifier = PaymentVerifier::new(payment_config); @@ -1182,10 +1184,9 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { - // Wire the P2PNode into the payment verifier so merkle-payment - // verification can run the pay-yourself closeness check against - // the live DHT. - protocol.payment_verifier().attach_p2p_node(Arc::clone(p2p)); + // Wire P2P into AntProtocol for direct PUT responsibility and + // payment-proof closeness checks. + protocol.attach_p2p_node(Arc::clone(p2p)); let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p);