Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/devnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::payment::{
EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
QuotingMetricsTracker,
};
use crate::replication::config::ReplicationConfig;
use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use evmlib::Network as EvmNetwork;
use evmlib::RewardsAddress;
Expand Down Expand Up @@ -550,9 +551,11 @@ impl Devnet {
};

let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
let replication_config = ReplicationConfig::default();
let payment_config = PaymentVerifierConfig {
evm: evm_config,
cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
close_group_size: replication_config.close_group_size,
local_rewards_address: rewards_address,
};
let payment_verifier = PaymentVerifier::new(payment_config);
Expand Down Expand Up @@ -611,10 +614,9 @@ impl Devnet {
*node.state.write().await = NodeState::Running;

if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
// Wire the P2PNode into the payment verifier for merkle-closeness checks.
protocol
.payment_verifier_arc()
.attach_p2p_node(Arc::clone(p2p));
// Wire P2P into AntProtocol for direct PUT responsibility and
// payment-proof closeness checks.
protocol.attach_p2p_node(Arc::clone(p2p));

let mut events = p2p.subscribe_events();
let p2p_clone = Arc::clone(p2p);
Expand Down
19 changes: 11 additions & 8 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ impl NodeBuilder {
Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
};

let repl_config = ReplicationConfig::default();

// Initialize ANT protocol handler for chunk storage and
// wire the fresh-write channel so PUTs trigger replication.
let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
let mut protocol =
Self::build_ant_protocol(&self.config, &identity, repl_config.close_group_size)
.await?;
protocol.set_fresh_write_sender(fresh_write_tx);
(Some(Arc::new(protocol)), Some(fresh_write_rx))
} else {
Expand All @@ -121,19 +125,16 @@ impl NodeBuilder {

let p2p_arc = Arc::new(p2p_node);

// Wire the P2PNode handle into the payment verifier so merkle-payment
// checks can query the live DHT for peers actually closest to a pool
// midpoint (pay-yourself defence).
// Wire the P2PNode handle into AntProtocol so direct PUTs can verify
// close-group responsibility and payment proofs can query live-DHT
// closeness.
if let Some(ref protocol) = ant_protocol {
protocol
.payment_verifier_arc()
.attach_p2p_node(Arc::clone(&p2p_arc));
protocol.attach_p2p_node(Arc::clone(&p2p_arc));
}

// Initialize replication engine (if storage is enabled)
let replication_engine =
if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
let repl_config = ReplicationConfig::default();
let storage_arc = protocol.storage();
let payment_verifier_arc = protocol.payment_verifier_arc();
match ReplicationEngine::new(
Expand Down Expand Up @@ -349,6 +350,7 @@ impl NodeBuilder {
async fn build_ant_protocol(
config: &NodeConfig,
identity: &NodeIdentity,
close_group_size: usize,
) -> Result<AntProtocol> {
// Create LMDB storage
let storage_config = LmdbStorageConfig {
Expand Down Expand Up @@ -378,6 +380,7 @@ impl NodeBuilder {
network: evm_network,
},
cache_capacity: config.payment.cache_capacity,
close_group_size,
local_rewards_address: rewards_address,
};
let payment_verifier = PaymentVerifier::new(payment_config);
Expand Down
125 changes: 95 additions & 30 deletions src/payment/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,33 @@ const DEFAULT_CACHE_CAPACITY: usize = 100_000;
/// This cache stores `XorName` values that have been verified to exist on the
/// autonomi network, avoiding repeated network queries for the same data.
///
/// Each entry carries a flag recording whether the verification that inserted
/// it ran the full client-PUT check set (`true`) or only the
/// receipt-authenticity subset used for replication (`false`). A
/// replication-verified entry must not satisfy a later client-PUT fast-path —
/// the context-gated checks (own-quote freshness, local recipient, merkle
/// candidate closeness) were never run for it — while either kind of entry
/// satisfies a later replication check.
/// Each entry records which fresh proof verification level inserted it. A
/// paid-list entry must not satisfy a later client-PUT fast-path because
/// paid-list admission does not authorize storing the actual chunk. Stronger
/// entries satisfy weaker lookups.
#[derive(Clone)]
pub struct VerifiedCache {
/// Value: `true` if the entry was verified under the full client-PUT
/// check set, `false` if only under the replication subset.
inner: Arc<Mutex<LruCache<XorName, bool>>>,
inner: Arc<Mutex<LruCache<XorName, VerificationLevel>>>,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
additions: Arc<AtomicU64>,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum VerificationLevel {
PaidList,
ClientPut,
}

impl VerificationLevel {
fn satisfies(self, required: Self) -> bool {
matches!(
(self, required),
(Self::PaidList, Self::PaidList) | (Self::ClientPut, Self::PaidList | Self::ClientPut)
)
}
}

/// Cache statistics for monitoring.
#[derive(Debug, Default, Clone, Copy)]
pub struct CacheStats {
Expand Down Expand Up @@ -86,11 +96,10 @@ impl VerifiedCache {
}
}

/// Check if a `XorName` is in the cache (verified under either check set).
/// Check if a `XorName` is in the cache (verified under any fresh check set).
///
/// Returns `true` if the `XorName` is cached (verified to exist on autonomi).
/// Sufficient for replication-context lookups; client-PUT lookups must use
/// [`Self::contains_client_put_verified`].
/// Paid-list and client-PUT lookups must use their stricter helpers.
#[must_use]
pub fn contains(&self, xorname: &XorName) -> bool {
let found = self.inner.lock().get(xorname).is_some();
Expand All @@ -104,14 +113,42 @@ impl VerifiedCache {
found
}

/// Check if a `XorName` is cached AND its verification ran at least the
/// paid-list admission check set.
///
/// A client-PUT entry returns `true` here because it passed the stricter
/// store-admission path at the caller.
#[must_use]
pub fn contains_paid_list_verified(&self, xorname: &XorName) -> bool {
let found = self
.inner
.lock()
.get(xorname)
.copied()
.is_some_and(|level| level.satisfies(VerificationLevel::PaidList));

if found {
self.hits.fetch_add(1, Ordering::Relaxed);
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
}

found
}

/// Check if a `XorName` is cached AND its verification ran the full
/// client-PUT check set.
/// client-PUT store-admission check set.
///
/// A replication-verified entry returns `false` here: it never passed the
/// client-PUT-only checks, so it must not let a later client PUT skip them.
/// Paid-list entries return `false` here because they did not pass the
/// client-PUT store-admission path.
#[must_use]
pub fn contains_client_put_verified(&self, xorname: &XorName) -> bool {
let found = self.inner.lock().get(xorname).copied() == Some(true);
let found = self
.inner
.lock()
.get(xorname)
.copied()
.is_some_and(|level| level.satisfies(VerificationLevel::ClientPut));

if found {
self.hits.fetch_add(1, Ordering::Relaxed);
Expand All @@ -125,27 +162,32 @@ impl VerifiedCache {
/// Add a `XorName` verified under the full client-PUT check set.
///
/// This should be called after verifying that data exists on the autonomi network.
/// Also upgrades an existing replication-verified entry.
/// Also upgrades an existing paid-list-verified entry.
pub fn insert(&self, xorname: XorName) {
self.inner.lock().put(xorname, true);
self.additions.fetch_add(1, Ordering::Relaxed);
self.insert_with_level(xorname, VerificationLevel::ClientPut);
}

/// Add a `XorName` verified under the replication (receipt-authenticity)
/// subset only.
/// Add a `XorName` verified under paid-list admission checks.
///
/// Never downgrades an existing client-PUT-verified entry — the stronger
/// verification already happened, and replication re-offers of the same
/// key are routine.
pub fn insert_replication_verified(&self, xorname: XorName) {
/// Never downgrades an existing client-PUT-verified entry.
pub fn insert_paid_list_verified(&self, xorname: XorName) {
self.insert_with_level(xorname, VerificationLevel::PaidList);
}

fn insert_with_level(&self, xorname: XorName, level: VerificationLevel) {
let added = {
let mut inner = self.inner.lock();
// `get_mut` refreshes LRU recency for existing entries of either kind.
if inner.get_mut(&xorname).is_none() {
inner.put(xorname, false);
true
} else {
if inner.get(&xorname).is_some() {
if let Some(existing) = inner.get_mut(&xorname) {
if !existing.satisfies(level) {
*existing = level;
}
}
false
} else {
inner.put(xorname, level);
true
}
};
if added {
Expand Down Expand Up @@ -216,6 +258,29 @@ mod tests {
assert_eq!(cache.len(), 2);
}

#[test]
fn test_cache_verification_levels_do_not_downgrade_or_over_authorize() {
let cache = VerifiedCache::new();
let paid_list = [2u8; 32];
let client_put = [3u8; 32];

cache.insert_paid_list_verified(paid_list);
assert!(cache.contains(&paid_list));
assert!(cache.contains_paid_list_verified(&paid_list));
assert!(!cache.contains_client_put_verified(&paid_list));

cache.insert(paid_list);
assert!(cache.contains_client_put_verified(&paid_list));

cache.insert(client_put);
assert!(cache.contains(&client_put));
assert!(cache.contains_paid_list_verified(&client_put));
assert!(cache.contains_client_put_verified(&client_put));

cache.insert_paid_list_verified(client_put);
assert!(cache.contains_client_put_verified(&client_put));
}

#[test]
fn test_cache_stats() {
let cache = VerifiedCache::new();
Expand Down
35 changes: 17 additions & 18 deletions src/payment/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ pub struct QuoteGenerator {
///
/// When attached, quote prices are computed from
/// [`LmdbStorage::current_chunks()`] — the **same** count the
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate
/// compares the quote against. Keeping pricing and freshness on one source
/// means a quote priced at record count `N` is later checked against a
/// current count that differs only by genuine in-flight growth, instead of
/// by the standing client-PUT-vs-replication gap that rejected every
/// payment when pricing read the side counter and freshness read the store.
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) price-floor check
/// compares the paid quote against. Keeping pricing and verification on one
/// source means a quote priced at record count `N` is later checked against
/// a current count that differs only by genuine in-flight growth, instead of
/// by a side-counter-vs-store gap.
/// `None` until [`Self::attach_storage`] is called.
storage: RwLock<Option<Arc<LmdbStorage>>>,
/// Signing function provided by the node.
Expand Down Expand Up @@ -84,10 +83,10 @@ impl QuoteGenerator {
/// authoritative on-disk record count.
///
/// This MUST be wired to the same `LmdbStorage` the
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) freshness gate reads
/// via `current_chunks()`; otherwise pricing and freshness diverge and the
/// gate rejects healthy payments. Idempotent: calling twice replaces the
/// handle. Uses interior mutability so it can be called on an `Arc`.
/// [`PaymentVerifier`](crate::payment::PaymentVerifier) price-floor check
/// reads via `current_chunks()`; otherwise pricing and verification diverge
/// and healthy payments can be rejected. Idempotent: calling twice replaces
/// the handle. Uses interior mutability so it can be called on an `Arc`.
pub fn attach_storage(&self, storage: Arc<LmdbStorage>) {
*self.storage.write() = Some(storage);
debug!("QuoteGenerator: LmdbStorage attached for current-records pricing");
Expand All @@ -97,7 +96,7 @@ impl QuoteGenerator {
///
/// Prefers the attached `LmdbStorage` count (authoritative — counts client
/// PUTs, replication stores, and repair fetches alike, exactly matching the
/// verifier's freshness source). Falls back to the in-memory
/// verifier's price-floor source). Falls back to the in-memory
/// `metrics_tracker` when no storage is attached or the read fails, so
/// pricing never panics or stalls.
fn pricing_records_stored(&self) -> usize {
Expand Down Expand Up @@ -184,7 +183,7 @@ impl QuoteGenerator {
let timestamp = SystemTime::now();

// Calculate price from the authoritative current record count (the same
// count the verifier's freshness gate reads), falling back to the
// count the verifier's price-floor check reads), falling back to the
// in-memory counter only when no storage is attached.
let price = calculate_price(self.pricing_records_stored());

Expand Down Expand Up @@ -370,13 +369,13 @@ mod tests {
generator
}

/// Regression test for the STG-01 quote-freshness rejection: pricing must
/// read the attached store's `current_chunks()`, NOT the side counter.
/// Regression test for the STG-01 quote-pricing mismatch: pricing must read
/// the attached store's `current_chunks()`, NOT the side counter.
///
/// Before the fix, the price came from `metrics_tracker` (client-PUT count
/// only) while the verifier's freshness gate read `current_chunks()` (all
/// records, including replicated ones). On a replicating network the store
/// count ran far ahead of the side counter, so every quote looked "stale".
/// only) while verifier checks read `current_chunks()` (all records,
/// including replicated ones). On a replicating network the store count ran
/// far ahead of the side counter, so every quote looked underpriced.
/// Here we attach a store, write records WITHOUT touching the side counter
/// (mimicking replication stores), and assert the quote prices off the
/// store count — i.e. the two sources now agree.
Expand Down Expand Up @@ -441,7 +440,7 @@ mod tests {
derive_records_stored_from_price(quote.price),
25,
"verifier's price-inverse must recover the store count, keeping the \
freshness delta at ~0 for a freshly issued quote"
local price comparison aligned for a freshly issued quote"
);
}

Expand Down
Loading
Loading