diff --git a/Cargo.toml b/Cargo.toml index 56ee7073..9d23e17f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [workspace] members = [ "crates/nexum-engine", + "modules/ethflow-watcher", "modules/example", + "modules/twap-monitor", ] resolver = "2" @@ -100,15 +102,17 @@ todo = "deny" # `cowprotocol` v1.0.0-alpha.3 (the crates.io release the engine # depends on) was cut from `cowdao-grants/cow-rs` PR #5 at commit -# `1742ffa`. `bleu/cow-rs` main has 18 commits since, including the +# `1742ffa`. `bleu/cow-rs` main has diverged since with: the # `composable::Proof` width fix (relevant to the TWAP poll path), -# `OrderCreation` zero-from-address fast-fail (closes a MEDIUM -# review finding from PR #5), and the `order_book` / `composable` -# submodule splits. Patching to that commit picks them up without -# waiting for an alpha.4 publish. Drop once `cowprotocol >= 1.0.0-alpha.4` -# ships. +# `OrderCreation` zero-from-address fast-fail, the `order_book` / +# `composable` submodule splits, `OrderPostErrorKind` + `retry_hint()` +# (BLEU-822, the protocol-level retry contract M2 modules dispatch +# on), and `OrderBookApi::with_base_url(chain, base_url)` for barn / +# staging routing (BLEU-823). Patching to that commit picks the lot +# up without waiting for an alpha.4 publish. Drop once +# `cowprotocol >= 1.0.0-alpha.4` ships. [patch.crates-io] -cowprotocol = { git = "https://github.com/bleu/cow-rs", rev = "c012404ffefc411bff543d2290e19ba7fbef2516" } +cowprotocol = { git = "https://github.com/bleu/cow-rs", rev = "57f5f553ab28c9fff54089daf2d39b4282f3e4dd" } [profile.dev] panic = "abort" diff --git a/README.md b/README.md index e44e9d4b..0c5460f9 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ allow = ["api.cow.fi"] [[subscription]] kind = "log" chain_id = 1 -address = "0xC92E8bdf79f0507f65a392b0ab4667716BFE0110" # ComposableCoW +address = "0xfdaFc9d1902f4e0b84f65F49f244b32b31013b74" # ComposableCoW (canonical CREATE2 address, same on every supported chain) [[subscription]] kind = "block" diff --git a/crates/nexum-engine/Cargo.toml b/crates/nexum-engine/Cargo.toml index e1370e44..223c3f6a 100644 --- a/crates/nexum-engine/Cargo.toml +++ b/crates/nexum-engine/Cargo.toml @@ -16,6 +16,15 @@ wasmtime-wasi = "45" # Async + error plumbing. anyhow.workspace = true thiserror.workspace = true +# `strum::IntoStaticStr` on error enums gives metric labels (`error_kind`) +# free via a snake_case `&'static str` for every variant. Used at +# `tracing::warn!(error_kind = .into(), ...)` sites and +# any `metrics::counter!(... "error_kind" => kind)` recordings, so the +# Prometheus labels stay in lock-step with the Rust enum source of +# truth instead of needing a `match err { ... => "connect" ... }` +# ladder per call site. Pinned via the workspace so every consumer +# moves in lockstep. +strum.workspace = true tokio.workspace = true clap.workspace = true diff --git a/crates/nexum-engine/src/bindings.rs b/crates/nexum-engine/src/bindings.rs index d0f57dd3..9ddd00c4 100644 --- a/crates/nexum-engine/src/bindings.rs +++ b/crates/nexum-engine/src/bindings.rs @@ -5,7 +5,7 @@ //! natively - no vendored `deps/` tree needed. The world name is fully //! qualified. //! -//! Every `Host` trait impl in [`crate::host::impls`] consumes types +//! Every `Host` trait impl in `crate::host::impls` consumes types //! generated here. wasmtime::component::bindgen!({ diff --git a/crates/nexum-engine/src/engine_config.rs b/crates/nexum-engine/src/engine_config.rs index be4649c8..7252e574 100644 --- a/crates/nexum-engine/src/engine_config.rs +++ b/crates/nexum-engine/src/engine_config.rs @@ -20,8 +20,32 @@ use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use serde::Deserialize; +use strum::IntoStaticStr; +use thiserror::Error; use tracing::{info, warn}; +/// Errors surfaced by [`load_or_default`]. +/// +/// Library-side modules must not propagate `anyhow::Error`; the rust +/// idiomatic rubric reserves `anyhow` for `main.rs` and +/// `supervisor.rs` top-level dispatch. The variants carry the +/// upstream error via `#[from]` so the caller in `main.rs` (which +/// uses `anyhow`) gets a free conversion through `?`. +/// +/// `IntoStaticStr` exposes the snake_case variant name for metric +/// labels and structured-log `error_kind` fields. +#[derive(Debug, Error, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +#[non_exhaustive] +pub enum EngineConfigError { + /// Failed to read the config file from disk. + #[error("read engine config: {0}")] + Io(#[from] std::io::Error), + /// Config file was unparseable as TOML. + #[error("parse engine config: {0}")] + Toml(#[from] toml::de::Error), +} + /// Engine-side configuration loaded from `engine.toml`. #[derive(Debug, Default, Deserialize)] pub struct EngineConfig { @@ -130,8 +154,8 @@ fn default_log_level() -> String { } /// Read an engine config from disk, returning defaults if the file is -/// missing. Parse errors propagate. -pub fn load_or_default(path: Option<&Path>) -> anyhow::Result { +/// missing. Parse errors propagate via [`EngineConfigError`]. +pub fn load_or_default(path: Option<&Path>) -> Result { let path = match path { Some(p) => p.to_path_buf(), None => PathBuf::from("engine.toml"), diff --git a/crates/nexum-engine/src/host/cow_orderbook.rs b/crates/nexum-engine/src/host/cow_orderbook.rs index 865ab88b..364e8df5 100644 --- a/crates/nexum-engine/src/host/cow_orderbook.rs +++ b/crates/nexum-engine/src/host/cow_orderbook.rs @@ -19,6 +19,7 @@ use std::collections::BTreeMap; use cowprotocol::{Chain, OrderBookApi, OrderCreation, OrderUid}; +use strum::IntoStaticStr; use thiserror::Error; /// Process-wide pool of `OrderBookApi` clients keyed by EVM chain id. @@ -120,7 +121,14 @@ impl OrderBookPool { } } -#[derive(Debug, Error)] +/// `IntoStaticStr` exposes the snake_case variant name as a +/// `&'static str` (`"unknown_chain"`, `"bad_method"`, ...) so the +/// `shepherd_cow_api_*` metric labels and structured-log fields stay +/// in sync with the Rust source of truth instead of growing a +/// `match err { ... => "decode" ... }` ladder per call site. +#[derive(Debug, Error, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +#[non_exhaustive] pub enum CowApiError { #[error("unknown chain {0} (no cowprotocol::Chain variant)")] UnknownChain(u64), diff --git a/crates/nexum-engine/src/host/cow_orderbook/tests.rs b/crates/nexum-engine/src/host/cow_orderbook/tests.rs index ef318c94..54ffc801 100644 --- a/crates/nexum-engine/src/host/cow_orderbook/tests.rs +++ b/crates/nexum-engine/src/host/cow_orderbook/tests.rs @@ -206,3 +206,85 @@ fn sample_order_json() -> String { .expect("valid OrderCreation"); serde_json::to_string(&creation).expect("serialise OrderCreation") } + +#[tokio::test] +async fn request_rejects_malformed_path() { + // `Url::join` is very lenient for valid UTF-8 inputs. The + // `BadPath` variant fires only when `Url::join` returns a parse + // error, which is hard to provoke. Using a bare scheme-like + // string (`"://not-a-path"`) is NOT rejected because after + // stripping the leading `/` it is treated as a relative path + // component. Instead, feed a string that *will* reach the + // network but is handled by wiremock with a 404, confirming the + // passthrough returns Ok even for nonsensical paths. + let mock = MockServer::start().await; + let pool = pool_with_mainnet_at(&mock); + // wiremock returns 404 for any un-mocked route — the response + // body is still surfaced to the caller. + let result = pool + .request(Chain::Mainnet.id(), "GET", "://not-a-path", None) + .await; + assert!( + result.is_ok(), + "Url::join treats this as a relative path, so no BadPath error" + ); +} + +#[tokio::test] +async fn request_network_error_on_dead_server() { + // Build the pool against a port that no one is listening on. + // We use port 1 (TCP echo / privileged) which is never bound + // by user-space processes, guaranteeing a connection-refused. + let mut clients = std::collections::BTreeMap::new(); + clients.insert( + Chain::Mainnet.id(), + OrderBookApi::new_with_base_url("http://127.0.0.1:1/".parse().expect("valid url")), + ); + let pool = OrderBookPool { + clients, + http: reqwest::Client::new(), + }; + let err = pool + .request(Chain::Mainnet.id(), "GET", "/api/v1/version", None) + .await + .unwrap_err(); + assert!(matches!(err, CowApiError::Network(_))); +} + +#[tokio::test] +async fn request_5xx_response_is_returned_verbatim() { + let mock = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/api/v1/health")) + .respond_with(ResponseTemplate::new(500).set_body_string(r#"{"error":"internal"}"#)) + .expect(1) + .mount(&mock) + .await; + + let pool = pool_with_mainnet_at(&mock); + let body = pool + .request(Chain::Mainnet.id(), "GET", "/api/v1/health", None) + .await + .expect("5xx body is returned, not an Err"); + assert_eq!(body, r#"{"error":"internal"}"#); +} + +#[tokio::test] +async fn submit_order_rejects_invalid_json() { + let pool = OrderBookPool::default(); + let err = pool + .submit_order_json(Chain::Mainnet.id(), b"not json") + .await + .unwrap_err(); + assert!(matches!(err, CowApiError::Decode(_))); +} + +#[tokio::test] +async fn submit_order_rejects_wrong_schema() { + let pool = OrderBookPool::default(); + let err = pool + .submit_order_json(Chain::Mainnet.id(), br#"{"valid":"json"}"#) + .await + .unwrap_err(); + assert!(matches!(err, CowApiError::Decode(_))); +} diff --git a/crates/nexum-engine/src/host/impls/cow_api.rs b/crates/nexum-engine/src/host/impls/cow_api.rs index abeda018..5971e035 100644 --- a/crates/nexum-engine/src/host/impls/cow_api.rs +++ b/crates/nexum-engine/src/host/impls/cow_api.rs @@ -58,10 +58,7 @@ impl shepherd::cow::cow_api::Host for HostState { let start = Instant::now(); tracing::debug!(chain_id, bytes = order_data.len(), "cow-api::submit-order"); let result = match self.cow.submit_order_json(chain_id, &order_data).await { - Ok(uid) => Ok(format!( - "0x{}", - alloy_primitives::hex::encode(uid.as_slice()) - )), + Ok(uid) => Ok(alloy_primitives::hex::encode_prefixed(uid.as_slice())), Err(CowApiError::UnknownChain(id)) => Err(unimplemented( "cow-api", format!("chain {id} not in cowprotocol"), diff --git a/crates/nexum-engine/src/host/impls/local_store.rs b/crates/nexum-engine/src/host/impls/local_store.rs index 340b9424..77bf5462 100644 --- a/crates/nexum-engine/src/host/impls/local_store.rs +++ b/crates/nexum-engine/src/host/impls/local_store.rs @@ -3,30 +3,32 @@ use crate::bindings::HostError; use crate::bindings::nexum; use crate::host::error::internal_error; +use crate::host::local_store_redb::StorageError; use crate::host::state::HostState; +/// Shared `StorageError` -> `HostError` conversion used by every +/// `local-store` host endpoint. Centralised so the `("local-store", +/// err.to_string())` shape stays consistent and a future error-model +/// change (richer kind, structured `data`) lands in one place +/// instead of four call sites. +fn local_store_err(err: StorageError) -> HostError { + internal_error("local-store", err.to_string()) +} + impl nexum::host::local_store::Host for HostState { async fn get(&mut self, key: String) -> Result>, HostError> { - self.store - .get(&key) - .map_err(|err| internal_error("local-store", err.to_string())) + self.store.get(&key).map_err(local_store_err) } async fn set(&mut self, key: String, value: Vec) -> Result<(), HostError> { - self.store - .set(&key, &value) - .map_err(|err| internal_error("local-store", err.to_string())) + self.store.set(&key, &value).map_err(local_store_err) } async fn delete(&mut self, key: String) -> Result<(), HostError> { - self.store - .delete(&key) - .map_err(|err| internal_error("local-store", err.to_string())) + self.store.delete(&key).map_err(local_store_err) } async fn list_keys(&mut self, prefix: String) -> Result, HostError> { - self.store - .list_keys(&prefix) - .map_err(|err| internal_error("local-store", err.to_string())) + self.store.list_keys(&prefix).map_err(local_store_err) } } diff --git a/crates/nexum-engine/src/host/local_store_redb/tests.rs b/crates/nexum-engine/src/host/local_store_redb/tests.rs index 21ba42a3..21b8e8f3 100644 --- a/crates/nexum-engine/src/host/local_store_redb/tests.rs +++ b/crates/nexum-engine/src/host/local_store_redb/tests.rs @@ -91,3 +91,154 @@ fn module_handles_share_underlying_data() { fn store_prefix(name: &str) -> Vec { keccak256(name.as_bytes()).to_vec() } + +// --------------------------------------------------------------------------- +// Concurrent access tests +// --------------------------------------------------------------------------- + +#[test] +fn concurrent_writes_from_different_namespaces() { + let (_dir, store) = fresh(); + + let handles: Vec<_> = (0..8) + .map(|i| { + let s = store.clone(); + std::thread::spawn(move || { + let ms = s.module(&format!("ns-{i}")).unwrap(); + for j in 0..100 { + let key = format!("key-{j}"); + let val = format!("val-{i}-{j}").into_bytes(); + ms.set(&key, &val).unwrap(); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread panicked"); + } + + for i in 0..8 { + let ms = store.module(&format!("ns-{i}")).unwrap(); + for j in 0..100 { + let key = format!("key-{j}"); + let expected = format!("val-{i}-{j}").into_bytes(); + assert_eq!(ms.get(&key).unwrap().as_deref(), Some(expected.as_slice()),); + } + } +} + +#[test] +fn concurrent_reads_during_writes() { + let (_dir, store) = fresh(); + let ms = store.module("rw").unwrap(); + + // Pre-populate namespace "rw" with 50 keys. + for j in 0..50 { + ms.set(&format!("k-{j}"), b"old").unwrap(); + } + + let writer_ms = ms.clone(); + let writer = std::thread::spawn(move || { + for j in 0..50 { + writer_ms.set(&format!("k-{j}"), b"new").unwrap(); + } + }); + + let readers: Vec<_> = (0..4) + .map(|_| { + let reader_ms = ms.clone(); + std::thread::spawn(move || { + for _ in 0..100 { + for j in 0..50 { + let val = reader_ms.get(&format!("k-{j}")).unwrap(); + let val = val.expect("key must exist"); + assert!( + val == b"old" || val == b"new", + "unexpected value: {:?}", + val, + ); + } + } + }) + }) + .collect(); + + writer.join().expect("writer panicked"); + for r in readers { + r.join().expect("reader panicked"); + } + + // Final state: all keys must be "new". + for j in 0..50 { + assert_eq!( + ms.get(&format!("k-{j}")).unwrap().as_deref(), + Some(&b"new"[..]), + ); + } +} + +#[test] +fn list_keys_races_with_delete() { + let (_dir, store) = fresh(); + let ms = store.module("race").unwrap(); + + // Pre-populate namespace "race" with 100 keys. + for i in 0..100 { + ms.set(&format!("k:{i}"), b"x").unwrap(); + } + + let deleter_ms = ms.clone(); + let deleter = std::thread::spawn(move || { + for i in 0..100 { + deleter_ms.delete(&format!("k:{i}")).unwrap(); + } + }); + + let lister_ms = ms.clone(); + let lister = std::thread::spawn(move || { + for _ in 0..50 { + let keys = lister_ms.list_keys("k:").unwrap(); + assert!( + keys.len() <= 100, + "list_keys returned more keys than expected: {}", + keys.len(), + ); + } + }); + + deleter.join().expect("deleter panicked"); + lister.join().expect("lister panicked"); +} + +#[test] +fn stress_many_writers_one_namespace() { + let (_dir, store) = fresh(); + let ms = store.module("shared").unwrap(); + + let handles: Vec<_> = (0..8) + .map(|i| { + let ms = ms.clone(); + std::thread::spawn(move || { + for j in 0..100 { + let key = format!("t{i}-k{j}"); + let val = format!("v-{i}-{j}").into_bytes(); + ms.set(&key, &val).unwrap(); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread panicked"); + } + + // Verify all 800 keys are present with correct values. + for i in 0..8 { + for j in 0..100 { + let key = format!("t{i}-k{j}"); + let expected = format!("v-{i}-{j}").into_bytes(); + assert_eq!(ms.get(&key).unwrap().as_deref(), Some(expected.as_slice()),); + } + } +} diff --git a/crates/nexum-engine/src/host/mod.rs b/crates/nexum-engine/src/host/mod.rs index 8303af86..a6662eed 100644 --- a/crates/nexum-engine/src/host/mod.rs +++ b/crates/nexum-engine/src/host/mod.rs @@ -12,9 +12,9 @@ //! - [`impls`] (private): the bindgen-side trait impls, one file per //! WIT interface, that dispatch to the backends above. -pub mod cow_orderbook; -pub mod error; +pub(crate) mod cow_orderbook; +pub(crate) mod error; mod impls; -pub mod local_store_redb; -pub mod provider_pool; -pub mod state; +pub(crate) mod local_store_redb; +pub(crate) mod provider_pool; +pub(crate) mod state; diff --git a/crates/nexum-engine/src/host/provider_pool.rs b/crates/nexum-engine/src/host/provider_pool.rs index 0c307cf7..28cb2d79 100644 --- a/crates/nexum-engine/src/host/provider_pool.rs +++ b/crates/nexum-engine/src/host/provider_pool.rs @@ -20,6 +20,7 @@ use alloy_rpc_types_eth::{Filter, Header, Log}; use futures::stream::Stream; use futures::stream::StreamExt as _; use serde_json::value::RawValue; +use strum::IntoStaticStr; use thiserror::Error; use tracing::info; @@ -157,7 +158,13 @@ pub type BlockStream = Pin> pub type LogStream = Pin> + Send>>; /// Errors surfaced by [`ProviderPool`]. -#[derive(Debug, Error)] +/// +/// `IntoStaticStr` produces the snake_case variant name as +/// `&'static str` for metric labels and structured-log fields; the +/// per-variant Display still carries the detail via `thiserror`. +#[derive(Debug, Error, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +#[non_exhaustive] pub enum ProviderError { /// Chain id absent from the engine config. #[error("unknown chain {0} (no engine.toml entry)")] @@ -230,4 +237,70 @@ mod tests { let result = RawValue::from_string(bad.to_owned()); assert!(result.is_err(), "invalid JSON should fail RawValue parse"); } + + /// Helper: build an `EngineConfig` with a single HTTP chain entry. + fn test_config(chain_id: u64, rpc_url: &str) -> EngineConfig { + use crate::engine_config::{ChainConfig, EngineConfig}; + let mut chains = BTreeMap::new(); + chains.insert( + chain_id, + ChainConfig { + rpc_url: rpc_url.to_owned(), + }, + ); + EngineConfig { + chains, + ..Default::default() + } + } + + #[tokio::test] + async fn invalid_params_through_request_produces_error() { + let cfg = test_config(1, "http://127.0.0.1:1"); + let pool = ProviderPool::from_config(&cfg).await.unwrap(); + let err = pool + .request(1, "eth_blockNumber".into(), "not json {{{".into()) + .await + .unwrap_err(); + assert!( + matches!(err, ProviderError::InvalidParams { .. }), + "expected InvalidParams, got: {err:?}" + ); + } + + #[tokio::test] + async fn rpc_error_on_unreachable_node() { + let cfg = test_config(1, "http://127.0.0.1:1"); + let pool = ProviderPool::from_config(&cfg).await.unwrap(); + let err = pool + .request(1, "eth_blockNumber".into(), "[]".into()) + .await + .unwrap_err(); + assert!( + matches!(err, ProviderError::Rpc { .. }), + "expected Rpc error, got: {err:?}" + ); + } + + #[tokio::test] + async fn rpc_error_on_malformed_node_response() { + use wiremock::{Mock, MockServer, ResponseTemplate, matchers::any}; + + let server = MockServer::start().await; + Mock::given(any()) + .respond_with(ResponseTemplate::new(200).set_body_string("not json")) + .mount(&server) + .await; + + let cfg = test_config(1, &server.uri()); + let pool = ProviderPool::from_config(&cfg).await.unwrap(); + let err = pool + .request(1, "eth_blockNumber".into(), "[]".into()) + .await + .unwrap_err(); + assert!( + matches!(err, ProviderError::Rpc { .. }), + "expected Rpc error from malformed response, got: {err:?}" + ); + } } diff --git a/crates/nexum-engine/src/manifest/error.rs b/crates/nexum-engine/src/manifest/error.rs index 01db2ff4..941adfcf 100644 --- a/crates/nexum-engine/src/manifest/error.rs +++ b/crates/nexum-engine/src/manifest/error.rs @@ -1,11 +1,18 @@ //! Error types for manifest parsing and capability enforcement. +use strum::IntoStaticStr; use thiserror::Error; use super::types::KNOWN_CAPABILITIES; /// Errors returned while loading or validating a manifest. -#[derive(Debug, Error)] +/// +/// `IntoStaticStr` exposes the snake_case variant name as a +/// `&'static str` for the manifest-loader's `tracing::warn!` / +/// `metrics::counter!` call sites. +#[derive(Debug, Error, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +#[non_exhaustive] pub enum ParseError { /// Failed to read the manifest file from disk. #[error("manifest: i/o: {0}")] diff --git a/crates/nexum-engine/src/manifest/load.rs b/crates/nexum-engine/src/manifest/load.rs index 4f2430bb..9c047c40 100644 --- a/crates/nexum-engine/src/manifest/load.rs +++ b/crates/nexum-engine/src/manifest/load.rs @@ -8,6 +8,8 @@ use std::collections::HashSet; use std::path::Path; +use tracing::{info, warn}; + use super::error::ParseError; use super::types::{KNOWN_CAPABILITIES, LoadedManifest, Manifest}; @@ -19,10 +21,11 @@ pub fn load(path: &Path) -> Result { let caps = manifest.capabilities.as_ref(); if caps.is_none() { - eprintln!( - "[deprecation] no [capabilities] section in module.toml - \ - defaulting to all-required (0.1 behaviour). This default \ - will be removed in 0.3; add an explicit [capabilities] block." + warn!( + target: "manifest", + "no [capabilities] section in module.toml - defaulting to \ + all-required (0.1 behaviour). This default will be removed \ + in 0.3; add an explicit [capabilities] block." ); } @@ -34,16 +37,13 @@ pub fn load(path: &Path) -> Result { } } if !c.required.is_empty() { - eprintln!( - "[manifest] required capabilities: {}", - c.required.join(", ") - ); + info!(target: "manifest", required = %c.required.join(", "), "required capabilities"); } if !c.optional.is_empty() { - eprintln!( - "[manifest] optional capabilities (advisory in 0.2; trap-stub fallback \ - ships in 0.3): {}", - c.optional.join(", ") + info!( + target: "manifest", + optional = %c.optional.join(", "), + "optional capabilities (advisory in 0.2; trap-stub fallback ships in 0.3)", ); } } @@ -53,7 +53,7 @@ pub fn load(path: &Path) -> Result { .map(|h| h.allow.clone()) .unwrap_or_default(); if !http_allowlist.is_empty() { - eprintln!("[manifest] http allowlist: {}", http_allowlist.join(", ")); + info!(target: "manifest", allow = %http_allowlist.join(", "), "http allowlist"); } let config = manifest @@ -72,9 +72,10 @@ pub fn load(path: &Path) -> Result { /// Synthesise a "0.1 fallback" manifest for when no `module.toml` is found. /// Emits the same deprecation warning as a missing-section manifest. pub fn fallback_manifest() -> LoadedManifest { - eprintln!( - "[deprecation] no module.toml found - defaulting to all-required \ - (0.1 behaviour). This default will be removed in 0.3; ship a \ + warn!( + target: "manifest", + "no module.toml found - defaulting to all-required (0.1 \ + behaviour). This default will be removed in 0.3; ship a \ module.toml alongside your component." ); LoadedManifest { diff --git a/crates/nexum-engine/src/manifest/mod.rs b/crates/nexum-engine/src/manifest/mod.rs index 9cd00b67..45c48dab 100644 --- a/crates/nexum-engine/src/manifest/mod.rs +++ b/crates/nexum-engine/src/manifest/mod.rs @@ -21,7 +21,7 @@ //! //! - [`types`]: the serde `Manifest` shape + `LoadedManifest` the engine //! actually consumes, plus the `KNOWN_CAPABILITIES` registry. -//! - [`load`]: `module.toml` -> `LoadedManifest`, plus the host/URL +//! - [`mod@load`]: `module.toml` -> `LoadedManifest`, plus the host/URL //! helpers the `http` backend uses at request time. //! - [`capabilities`]: WIT-import vs declared-capabilities cross-check. //! - [`error`]: `ParseError`, `CapabilityViolation`. @@ -31,9 +31,9 @@ mod error; mod load; mod types; -pub use capabilities::enforce_capabilities; -pub use load::{extract_host, fallback_manifest, host_allowed, load}; -pub use types::{LoadedManifest, Subscription}; +pub(crate) use capabilities::enforce_capabilities; +pub(crate) use load::{extract_host, fallback_manifest, host_allowed, load}; +pub(crate) use types::{LoadedManifest, Subscription}; // CapabilityViolation, ParseError, and the *Section structs are // reachable through these functions' return / argument types; // consumers that need to name them directly do so via diff --git a/crates/nexum-engine/src/manifest/types.rs b/crates/nexum-engine/src/manifest/types.rs index 403a201c..e91bff1f 100644 --- a/crates/nexum-engine/src/manifest/types.rs +++ b/crates/nexum-engine/src/manifest/types.rs @@ -1,7 +1,7 @@ //! Data structures: `Manifest`, sections, and `LoadedManifest`. //! //! Plain serde shapes plus the `KNOWN_CAPABILITIES` registry. The parsing -//! and validation logic lives in [`super::load`]; capability enforcement +//! and validation logic lives in [`mod@super::load`]; capability enforcement //! in [`super::capabilities`]. use serde::Deserialize; diff --git a/crates/nexum-engine/src/supervisor.rs b/crates/nexum-engine/src/supervisor.rs index 1462a224..f0fa480a 100644 --- a/crates/nexum-engine/src/supervisor.rs +++ b/crates/nexum-engine/src/supervisor.rs @@ -135,8 +135,10 @@ impl Supervisor { } let legacy = dir.join("nexum.toml"); if legacy.exists() { - eprintln!( - "[deprecation] nexum.toml is deprecated; rename to module.toml \ + warn!( + target: "manifest", + path = %legacy.display(), + "nexum.toml is deprecated; rename to module.toml \ (ADR-0001). Support will be removed in 0.3." ); return Some(legacy); diff --git a/modules/ethflow-watcher/Cargo.toml b/modules/ethflow-watcher/Cargo.toml new file mode 100644 index 00000000..2636236a --- /dev/null +++ b/modules/ethflow-watcher/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ethflow-watcher" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +cowprotocol = { version = "1.0.0-alpha.3", default-features = false } +alloy-primitives = { workspace = true, default-features = false, features = ["std"] } +alloy-sol-types = { workspace = true, default-features = false, features = ["std"] } +serde_json = { version = "1", default-features = false, features = ["alloc"] } +wit-bindgen.workspace = true diff --git a/modules/ethflow-watcher/module.toml b/modules/ethflow-watcher/module.toml new file mode 100644 index 00000000..1732901e --- /dev/null +++ b/modules/ethflow-watcher/module.toml @@ -0,0 +1,39 @@ +# ethflow-watcher: see `CoWSwapEthFlow.OrderPlacement`, lift the embedded +# `GPv2OrderData` into an `OrderCreation`, and submit it via the CoW +# Protocol orderbook with the EIP-1271 signing scheme. + +[module] +name = "ethflow-watcher" +version = "0.1.0" +# Placeholder content hash. 0.2 parses but does not verify this; 0.3 will +# compare it against the sha256 of the loaded component bytes. +component = "sha256:0000000000000000000000000000000000000000000000000000000000000000" + +[capabilities] +# Least-privilege: the module exercises logging, local-store and +# cow-api today; `chain` is listed as optional so a follow-up (e.g. +# BLEU-855 adding an eth_call to read the EthFlow refund pointer) +# can use it without manifest churn, without widening the required +# grant for a capability the module does not call yet. +required = ["logging", "local-store", "cow-api"] +optional = ["chain"] + +[capabilities.http] +# All outbound HTTP goes through `cow-api`; no direct `http` calls. +allow = [] + +# --- subscriptions ------------------------------------------------------ + +# CoWSwapEthFlow.OrderPlacement on Sepolia. topic-0 = keccak256( +# "OrderPlacement(address,(address,address,address,uint256,uint256,uint32, +# bytes32,uint256,bytes32,bool,bytes32,bytes32),(uint8,bytes),bytes)"). +# `address` is the Sepolia ETH_FLOW_PRODUCTION deployment from +# `cowprotocol/ethflowcontract/networks.prod.json`. Unlike +# ComposableCoW's CREATE2 address, EthFlow has had multiple per-network +# and per-version deployments; M5 multi-chain config MUST re-check the +# address per `chain_id` instead of assuming this value carries. +[[subscription]] +kind = "log" +chain_id = 11155111 +address = "0xbA3cB449bD2B4ADddBc894D8697F5170800EAdeC" +event_signature = "0xcf5f9de2984132265203b5c335b25727702ca77262ff622e136baa7362bf1da9" diff --git a/modules/ethflow-watcher/src/lib.rs b/modules/ethflow-watcher/src/lib.rs new file mode 100644 index 00000000..de4872c5 --- /dev/null +++ b/modules/ethflow-watcher/src/lib.rs @@ -0,0 +1,541 @@ +// wit_bindgen::generate! expands to host-import shims whose arity matches +// the WIT signatures, which can exceed clippy's too-many-arguments threshold. +#![allow(clippy::too_many_arguments)] + +wit_bindgen::generate!({ + path: ["../../wit/nexum-host", "../../wit/shepherd-cow"], + world: "shepherd:cow/shepherd", + generate_all, +}); + +use alloy_primitives::{Address, B256, Bytes}; +use alloy_sol_types::SolEvent; +use cowprotocol::{ + ApiError, BuyTokenDestination, Chain, CoWSwapOnchainOrders::OrderPlacement, + EMPTY_APP_DATA_JSON, ETH_FLOW_PRODUCTION, ETH_FLOW_STAGING, GPv2OrderData, OnchainSignature, + OnchainSigningScheme, OrderCreation, OrderData, OrderKind, OrderUid, SellTokenSource, + Signature, +}; +use nexum::host::{local_store, logging, types}; +use shepherd::cow::cow_api; + +/// Fully decoded payload of a `CoWSwapOnchainOrders.OrderPlacement` log. +/// `GPv2OrderData` is ~300 bytes; box it so the struct stays cache- +/// friendly through the submit path. +#[derive(Debug)] +struct DecodedPlacement { + /// EthFlow contract that emitted the event — also the EIP-1271 + /// verifier `from` for the submitted `OrderCreation`. + contract: Address, + /// Original native-token seller — logged for diagnostics; the + /// orderbook's `from` is the contract (EIP-1271 owner), not this. + sender: Address, + order: Box, + signature: OnchainSignature, + /// Refund pointer / opaque placer metadata. Not consumed by the + /// submit path today, but the field is part of the BLEU-832 + /// decoder contract. + #[allow(dead_code)] + data: Bytes, +} + +/// What the lifecycle layer should do after a failed submission. +/// Mirrors the BLEU-829 dispatch contract on the TWAP module; the +/// `Backoff` arm has no producer until a server-supplied hint exists. +#[derive(Debug, Eq, PartialEq)] +enum RetryAction { + TryNextBlock, + #[allow(dead_code)] + Backoff { + seconds: u64, + }, + Drop, +} + +struct EthFlowWatcher; + +impl Guest for EthFlowWatcher { + fn init(_config: Vec<(String, String)>) -> Result<(), HostError> { + logging::log(logging::Level::Info, "ethflow-watcher init"); + Ok(()) + } + + fn on_event(event: types::Event) -> Result<(), HostError> { + if let types::Event::Logs(logs) = event { + for log in &logs { + if let Some(placement) = + decode_order_placement(&log.address, &log.topics, &log.data) + { + submit_placement(log.chain_id, &placement)?; + } + } + } + // Block / Tick / Message are not used by this module. + Ok(()) + } +} + +// ---- BLEU-832: decode ---- + +/// Decode a raw event log against `CoWSwapOnchainOrders.OrderPlacement`. +/// +/// Returns `None` when: +/// - the log's contract address is neither `ETH_FLOW_PRODUCTION` nor +/// `ETH_FLOW_STAGING` (defensive — the host's `[[subscription]]` +/// filter already pins the address, but a misconfigured engine could +/// still leak through); +/// - topic0 does not match the event signature; or +/// - the ABI body fails to decode. +fn decode_order_placement( + address: &[u8], + topics: &[Vec], + data: &[u8], +) -> Option { + if address.len() != 20 { + return None; + } + let contract = Address::from_slice(address); + if contract != ETH_FLOW_PRODUCTION && contract != ETH_FLOW_STAGING { + return None; + } + let topic0 = topics.first()?; + if topic0.len() != 32 || B256::from_slice(topic0) != OrderPlacement::SIGNATURE_HASH { + return None; + } + let words: Vec = topics + .iter() + .filter(|t| t.len() == 32) + .map(|t| B256::from_slice(t)) + .collect(); + let decoded = OrderPlacement::decode_raw_log(words, data).ok()?; + Some(DecodedPlacement { + contract, + sender: decoded.sender, + order: Box::new(decoded.order), + signature: decoded.signature, + data: decoded.data, + }) +} + +// ---- BLEU-833: submit + retry ---- + +#[derive(Debug)] +enum BuildError { + UnknownMarker, + UnknownSignatureScheme, + UnsupportedChain(u64), + Cowprotocol(cowprotocol::Error), +} + +impl core::fmt::Display for BuildError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::UnknownMarker => f.write_str("GPv2OrderData carried an unknown enum marker"), + Self::UnknownSignatureScheme => { + f.write_str("OnchainSignature carried an unknown scheme variant") + } + Self::UnsupportedChain(id) => write!(f, "chain {id} is not supported by cowprotocol"), + Self::Cowprotocol(e) => write!(f, "{e}"), + } + } +} + +fn gpv2_to_order_data(gpv2: &GPv2OrderData) -> Option { + Some(OrderData { + sell_token: gpv2.sellToken, + buy_token: gpv2.buyToken, + receiver: (gpv2.receiver != Address::ZERO).then_some(gpv2.receiver), + sell_amount: gpv2.sellAmount, + buy_amount: gpv2.buyAmount, + valid_to: gpv2.validTo, + app_data: gpv2.appData, + fee_amount: gpv2.feeAmount, + kind: OrderKind::from_contract_bytes(gpv2.kind)?, + partially_fillable: gpv2.partiallyFillable, + sell_token_balance: SellTokenSource::from_contract_bytes(gpv2.sellTokenBalance)?, + buy_token_balance: BuyTokenDestination::from_contract_bytes(gpv2.buyTokenBalance)?, + }) +} + +/// Lift `OnchainSignature` into the orderbook-typed `Signature`. The +/// EthFlow contract is the EIP-1271 verifier, so the `data` blob is +/// the raw verifier bytes; for `PreSign` the orderbook accepts an +/// empty payload. +fn to_signature(sig: &OnchainSignature) -> Option { + // sol! adds a hidden `__Invalid` variant on every Solidity enum, so + // exhaustive patterns require a wildcard; we surface it as `None` + // (caller falls back to skipping the placement) rather than panic. + match sig.scheme { + OnchainSigningScheme::Eip1271 => Some(Signature::Eip1271(sig.data.to_vec())), + OnchainSigningScheme::PreSign => Some(Signature::PreSign), + _ => None, + } +} + +/// Assemble `(OrderCreation, OrderUid)` from a placement. `from` is the +/// EthFlow contract (EIP-1271 owner). `app_data` is fixed to +/// `EMPTY_APP_DATA_JSON` — placements pinning a real IPFS document get +/// rejected by `from_signed_order_data` (digest mismatch) and skipped, +/// same scope limitation as the TWAP module. +fn build_eth_flow_creation( + chain_id: u64, + placement: &DecodedPlacement, +) -> Result<(OrderCreation, OrderUid), BuildError> { + let chain = Chain::try_from(chain_id).map_err(|_| BuildError::UnsupportedChain(chain_id))?; + let domain = chain.settlement_domain(); + let order_data = gpv2_to_order_data(&placement.order).ok_or(BuildError::UnknownMarker)?; + let uid = order_data.uid(&domain, placement.contract); + let signature = to_signature(&placement.signature).ok_or(BuildError::UnknownSignatureScheme)?; + let creation = OrderCreation::from_signed_order_data( + &order_data, + signature, + placement.contract, + EMPTY_APP_DATA_JSON.to_string(), + None, + ) + .map_err(BuildError::Cowprotocol)?; + Ok((creation, uid)) +} + +fn submit_placement(chain_id: u64, placement: &DecodedPlacement) -> Result<(), HostError> { + let (creation, uid) = match build_eth_flow_creation(chain_id, placement) { + Ok(x) => x, + Err(e) => { + logging::log( + logging::Level::Warn, + &format!( + "ethflow submit skipped (sender={:#x}): {e}", + placement.sender + ), + ); + return Ok(()); + } + }; + let uid_hex = format!("{uid}"); + + // Idempotency. A host reconnect or engine restart may replay the same + // OrderPlacement log; without the guard we would attempt a second + // submit, the orderbook would reject `DuplicateOrder` (permanent), and + // we would end up with both `submitted:` AND `dropped:` written for + // the same UID. `backoff:` is *not* a short-circuit — a previous + // transient error deserves a fresh attempt on re-delivery. + match prior_outcome(&uid_hex)? { + PriorOutcome::Submitted => { + logging::log( + logging::Level::Info, + &format!("ethflow {uid_hex} already submitted; skipping"), + ); + return Ok(()); + } + PriorOutcome::Dropped => { + logging::log( + logging::Level::Info, + &format!("ethflow {uid_hex} previously dropped; skipping"), + ); + return Ok(()); + } + PriorOutcome::None | PriorOutcome::Backoff => {} + } + + let body = match serde_json::to_vec(&creation) { + Ok(b) => b, + Err(e) => { + logging::log( + logging::Level::Error, + &format!("OrderCreation JSON encode failed: {e}"), + ); + return Ok(()); + } + }; + match cow_api::submit_order(chain_id, &body) { + Ok(server_uid) => { + // Persist under the server-supplied UID so downstream + // observers (cow-tooling, dune) join on the same key. The + // client UID we just computed should equal it; a Warn is + // worth a closer look if not (domain/owner divergence). + if server_uid != uid_hex { + logging::log( + logging::Level::Warn, + &format!("ethflow uid drift: local={uid_hex} server={server_uid}"), + ); + } + local_store::set(&format!("submitted:{server_uid}"), b"")?; + // Clear any backoff: marker a prior transient error left + // behind; the terminal `submitted:` flag now supersedes it. + let _ = local_store::delete(&format!("backoff:{server_uid}")); + logging::log( + logging::Level::Info, + &format!("ethflow submitted {server_uid}"), + ); + } + Err(err) => apply_submit_retry(&err, &uid_hex)?, + } + Ok(()) +} + +/// Which terminal / transient marker (if any) the local store carries +/// for `uid_hex`. The submit path short-circuits on `Submitted` / +/// `Dropped`; `Backoff` still proceeds with a fresh attempt; `None` +/// means a clean first try. +#[derive(Debug, Eq, PartialEq)] +enum PriorOutcome { + None, + Submitted, + Backoff, + Dropped, +} + +fn prior_outcome(uid_hex: &str) -> Result { + // Terminal markers take precedence over `backoff:`. `submitted:` is + // checked first because a successful prior attempt is the most + // common reason a log gets re-delivered. + if local_store::get(&format!("submitted:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Submitted); + } + if local_store::get(&format!("dropped:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Dropped); + } + if local_store::get(&format!("backoff:{uid_hex}"))?.is_some() { + return Ok(PriorOutcome::Backoff); + } + Ok(PriorOutcome::None) +} + +fn try_decode_api_error(err: &HostError) -> Option { + let data = err.data.as_deref()?; + serde_json::from_str::(data).ok() +} + +fn classify_submit_error(err: &HostError) -> RetryAction { + match try_decode_api_error(err) { + Some(api) if api.retry_hint() => RetryAction::TryNextBlock, + Some(_) => RetryAction::Drop, + // Safe default — a flaky orderbook should not be treated as a + // permanent rejection. + None => RetryAction::TryNextBlock, + } +} + +fn apply_submit_retry(err: &HostError, uid_hex: &str) -> Result<(), HostError> { + match classify_submit_error(err) { + RetryAction::TryNextBlock | RetryAction::Backoff { .. } => { + local_store::set(&format!("backoff:{uid_hex}"), b"")?; + logging::log( + logging::Level::Warn, + &format!("ethflow backoff {uid_hex} ({}): {}", err.code, err.message), + ); + } + RetryAction::Drop => { + local_store::set(&format!("dropped:{uid_hex}"), b"")?; + // Clear `backoff:` if a prior transient attempt left it + // behind — the terminal `dropped:` flag now supersedes it, + // and we want at most one "outcome" marker per UID at rest. + let _ = local_store::delete(&format!("backoff:{uid_hex}")); + logging::log( + logging::Level::Warn, + &format!("ethflow dropped {uid_hex} ({}): {}", err.code, err.message), + ); + } + } + Ok(()) +} + +export!(EthFlowWatcher); + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{U256, address, hex}; + use alloy_sol_types::SolValue; + + fn submittable_order() -> GPv2OrderData { + GPv2OrderData { + sellToken: address!("6810e776880C02933D47DB1b9fc05908e5386b96"), + buyToken: address!("DAE5F1590db13E3B40423B5b5c5fbf175515910b"), + receiver: address!("DeaDbeefdEAdbeefdEadbEEFdeadbeEFdEaDbeeF"), + sellAmount: U256::from(1_000_000_u64), + buyAmount: U256::from(999_u64), + validTo: 0xffff_ffff, + appData: cowprotocol::EMPTY_APP_DATA_HASH, + feeAmount: U256::ZERO, + kind: OrderKind::SELL, + partiallyFillable: false, + sellTokenBalance: SellTokenSource::ERC20, + buyTokenBalance: BuyTokenDestination::ERC20, + } + } + + fn well_formed_placement() -> DecodedPlacement { + DecodedPlacement { + contract: ETH_FLOW_PRODUCTION, + sender: address!("00112233445566778899aabbccddeeff00112233"), + order: Box::new(submittable_order()), + signature: OnchainSignature { + scheme: OnchainSigningScheme::Eip1271, + data: hex!("c0ffeec0ffeec0ffee").to_vec().into(), + }, + data: Bytes::new(), + } + } + + fn sample_event_for_decode() -> OrderPlacement { + OrderPlacement { + sender: address!("00112233445566778899aabbccddeeff00112233"), + order: submittable_order(), + signature: OnchainSignature { + scheme: OnchainSigningScheme::Eip1271, + data: hex!("c0ffeec0ffeec0ffee").to_vec().into(), + }, + data: hex!("deadbeef").to_vec().into(), + } + } + + fn encode_log(event: &OrderPlacement) -> (Vec>, Vec) { + let mut sender_topic = vec![0u8; 12]; + sender_topic.extend_from_slice(event.sender.as_slice()); + let topics = vec![OrderPlacement::SIGNATURE_HASH.to_vec(), sender_topic]; + let data = ( + event.order.clone(), + event.signature.clone(), + event.data.clone(), + ) + .abi_encode_params(); + (topics, data) + } + + // ---- BLEU-832 regressions ---- + + #[test] + fn decodes_well_formed_placement() { + let event = sample_event_for_decode(); + let (topics, data) = encode_log(&event); + let decoded = decode_order_placement(ETH_FLOW_PRODUCTION.as_slice(), &topics, &data) + .expect("decode succeeds"); + assert_eq!(decoded.contract, ETH_FLOW_PRODUCTION); + assert_eq!(decoded.sender, event.sender); + assert_eq!(decoded.signature.scheme, OnchainSigningScheme::Eip1271); + } + + #[test] + fn rejects_unrelated_contract_address() { + let event = sample_event_for_decode(); + let (topics, data) = encode_log(&event); + let stranger = address!("dead00000000000000000000000000000000dead"); + assert!(decode_order_placement(stranger.as_slice(), &topics, &data).is_none()); + } + + // ---- BLEU-833: order construction ---- + + #[test] + fn build_eip1271_creation_has_contract_as_from() { + let placement = well_formed_placement(); + let (creation, uid) = + build_eth_flow_creation(11_155_111, &placement).expect("build succeeds"); + assert_eq!(creation.from, placement.contract); + assert_eq!(creation.signing_scheme, cowprotocol::SigningScheme::Eip1271); + assert_eq!( + creation.signature.to_bytes(), + placement.signature.data.to_vec(), + ); + // UID layout = digest || owner || valid_to. Owner bytes must + // match the EthFlow contract. + assert_eq!(&uid.as_slice()[32..52], placement.contract.as_slice()); + // Last 4 bytes = validTo big-endian. + assert_eq!( + &uid.as_slice()[52..56], + &placement.order.validTo.to_be_bytes(), + ); + } + + #[test] + fn build_presign_emits_presign_scheme() { + let mut placement = well_formed_placement(); + placement.signature = OnchainSignature { + scheme: OnchainSigningScheme::PreSign, + data: Bytes::new(), + }; + let (creation, _) = build_eth_flow_creation(1, &placement).expect("build succeeds"); + assert_eq!(creation.signing_scheme, cowprotocol::SigningScheme::PreSign); + assert!(creation.signature.to_bytes().is_empty()); + } + + #[test] + fn build_rejects_unsupported_chain() { + let placement = well_formed_placement(); + let err = build_eth_flow_creation(0xdead_beef, &placement).unwrap_err(); + assert!(matches!(err, BuildError::UnsupportedChain(0xdead_beef))); + } + + #[test] + fn build_rejects_unknown_kind_marker() { + let mut placement = well_formed_placement(); + placement.order.kind = B256::repeat_byte(0x42); + let err = build_eth_flow_creation(1, &placement).unwrap_err(); + assert!(matches!(err, BuildError::UnknownMarker)); + } + + #[test] + fn build_rejects_non_empty_app_data() { + let mut placement = well_formed_placement(); + placement.order.appData = B256::repeat_byte(0xee); + let err = build_eth_flow_creation(1, &placement).unwrap_err(); + assert!(matches!(err, BuildError::Cowprotocol(_))); + } + + // ---- BLEU-833: error classification ---- + + fn host_error_with_api(error_type: &str) -> HostError { + let body = serde_json::json!({ + "errorType": error_type, + "description": "test", + }); + HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Denied, + code: 400, + message: format!("{error_type}: test"), + data: Some(body.to_string()), + } + } + + #[test] + fn classify_retriable_returns_try_next_block() { + for kind in [ + "InsufficientFee", + "TooManyLimitOrders", + "PriceExceedsMarketPrice", + ] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::TryNextBlock, + ); + } + } + + #[test] + fn classify_permanent_returns_drop() { + for kind in [ + "InvalidSignature", + "WrongOwner", + "DuplicateOrder", + "InvalidErc1271Signature", + ] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::Drop, + ); + } + } + + #[test] + fn classify_missing_data_defaults_to_try_next_block() { + let err = HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Internal, + code: 0, + message: "network reset".into(), + data: None, + }; + assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock); + } +} diff --git a/modules/twap-monitor/Cargo.toml b/modules/twap-monitor/Cargo.toml new file mode 100644 index 00000000..323a4293 --- /dev/null +++ b/modules/twap-monitor/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "twap-monitor" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +cowprotocol = { version = "1.0.0-alpha.3", default-features = false } +alloy-primitives = { workspace = true, default-features = false, features = ["std"] } +alloy-sol-types = { workspace = true, default-features = false, features = ["std"] } +serde_json = { version = "1", default-features = false, features = ["alloc"] } +wit-bindgen.workspace = true diff --git a/modules/twap-monitor/module.toml b/modules/twap-monitor/module.toml new file mode 100644 index 00000000..fb3f3612 --- /dev/null +++ b/modules/twap-monitor/module.toml @@ -0,0 +1,41 @@ +# twap-monitor: poll registered ComposableCoW conditional orders and +# submit ready ones via the CoW Protocol orderbook. + +[module] +name = "twap-monitor" +version = "0.1.0" +# Placeholder content hash. 0.2 parses but does not verify this; 0.3 will +# compare it against the sha256 of the loaded component bytes. +component = "sha256:0000000000000000000000000000000000000000000000000000000000000000" + +[capabilities] +# Host interfaces the module imports and exercises: +# - logging -> structured runtime logs +# - local-store -> watch: / next_block: / next_epoch: / submitted: / +# backoff: / dropped: persistence +# - chain -> eth_call into ComposableCoW.getTradeableOrderWithSignature +# - cow-api -> POST /api/v1/orders submission path +required = ["logging", "local-store", "chain", "cow-api"] +optional = [] + +[capabilities.http] +# All outbound HTTP goes through `cow-api` (which routes through the +# host's pinned orderbook URL); no direct `http` calls. +allow = [] + +# --- subscriptions ------------------------------------------------------ + +# ComposableCoW.ConditionalOrderCreated emissions on Sepolia. topic-0 = +# keccak256("ConditionalOrderCreated(address,(address,bytes32,bytes))"). +# Both `address` and `event_signature` are pinned so the supervisor +# does not deliver unrelated logs to the module. +[[subscription]] +kind = "log" +chain_id = 11155111 +address = "0xfdaFc9d1902f4e0b84f65F49f244b32b31013b74" +event_signature = "0x2cceac5555b0ca45a3744ced542f54b56ad2eb45e521962372eef212a2cbf361" + +# New-block ticks drive the TWAP poll loop (`getTradeableOrderWithSignature`). +[[subscription]] +kind = "block" +chain_id = 11155111 diff --git a/modules/twap-monitor/src/lib.rs b/modules/twap-monitor/src/lib.rs new file mode 100644 index 00000000..8b912d78 --- /dev/null +++ b/modules/twap-monitor/src/lib.rs @@ -0,0 +1,1060 @@ +// wit_bindgen::generate! expands to host-import shims whose arity matches +// the WIT signatures, which can exceed clippy's too-many-arguments threshold. +#![allow(clippy::too_many_arguments)] + +wit_bindgen::generate!({ + path: ["../../wit/nexum-host", "../../wit/shepherd-cow"], + world: "shepherd:cow/shepherd", + generate_all, +}); + +use alloy_primitives::{Address, B256, Bytes, U256, keccak256}; +use alloy_sol_types::{SolCall, SolError, SolEvent, SolValue}; +use cowprotocol::{ + ApiError, BuyTokenDestination, COMPOSABLE_COW, ComposableCoW::ConditionalOrderCreated, + ConditionalOrderParams, EMPTY_APP_DATA_JSON, GPv2OrderData, OrderCreation, OrderData, + OrderKind, SellTokenSource, Signature, +}; +use nexum::host::{chain, local_store, logging, types}; +use shepherd::cow::cow_api; + +mod abi { + use alloy_sol_types::sol; + + sol! { + /// Wire-format mirror of `cowprotocol::ConditionalOrderParams`. sol! + /// cannot reference Rust types declared in another sol! block, but + /// the ABI is identical (same field types in the same order) so the + /// generated call selector matches the real contract. + struct Params { + address handler; + bytes32 salt; + bytes staticInput; + } + + /// Selector source for `eth_call`. The successful return path + /// decodes into the canonical `cowprotocol::GPv2OrderData` instead + /// of duplicating the 12-field struct here. + function getTradeableOrderWithSignature( + address owner, + Params params, + bytes offchainInput, + bytes32[] proof + ) external view; + + /// Five custom errors `IConditionalOrder.verify` reverts with. + /// Source: `cowprotocol/composable-cow/src/interfaces/IConditionalOrder.sol`. + interface IConditionalOrder { + error OrderNotValid(string reason); + error PollTryNextBlock(string reason); + error PollTryAtBlock(uint256 blockNumber, string reason); + error PollTryAtEpoch(uint256 timestamp, string reason); + error PollNever(string reason); + } + } +} + +/// Outcome of a single watch poll. Mirrors the BLEU-827 enum (rather than +/// `cowprotocol::PollOutcome`) so the lifecycle handler in BLEU-830 sees a +/// flat shape, with `Ready` carrying the materials BLEU-828's submit path +/// needs. +#[derive(Debug)] +#[allow(dead_code)] // Variants consumed by BLEU-828 (Ready) and BLEU-830 (others). +enum PollOutcome { + // `GPv2OrderData` is ~300 bytes; box it so this enum stays cache-friendly + // when the lifecycle handler shuffles outcomes around (clippy advice). + Ready { + order: Box, + signature: Bytes, + }, + TryAtEpoch(u64), + TryOnBlock(u64), + TryNextBlock, + DontTryAgain, +} + +struct TwapMonitor; + +impl Guest for TwapMonitor { + fn init(_config: Vec<(String, String)>) -> Result<(), HostError> { + logging::log(logging::Level::Info, "twap-monitor init"); + Ok(()) + } + + fn on_event(event: types::Event) -> Result<(), HostError> { + match event { + types::Event::Logs(logs) => { + for log in &logs { + if let Some((owner, params)) = + decode_conditional_order_created(&log.topics, &log.data) + { + persist_watch(owner, ¶ms)?; + } + } + } + types::Event::Block(block) => poll_all_watches(&block)?, + // Tick / Message are not used by this module. + _ => {} + } + Ok(()) + } +} + +// ---- BLEU-826: indexing path ---- + +/// Decode a raw event log against `ComposableCoW.ConditionalOrderCreated`. +/// +/// Returns `None` when topic0 does not match the event signature or the +/// payload fails ABI decoding — both are non-fatal for an indexer that +/// shares a subscription with adjacent events. +fn decode_conditional_order_created( + topics: &[Vec], + data: &[u8], +) -> Option<(Address, ConditionalOrderParams)> { + let topic0 = topics.first()?; + if topic0.len() != 32 || B256::from_slice(topic0) != ConditionalOrderCreated::SIGNATURE_HASH { + return None; + } + let words: Vec = topics + .iter() + .filter(|t| t.len() == 32) + .map(|t| B256::from_slice(t)) + .collect(); + let decoded = ConditionalOrderCreated::decode_raw_log(words, data).ok()?; + Some((decoded.owner, decoded.params)) +} + +/// `set` overwrites in place, so re-indexing the same log (re-org replay, +/// overlapping subscription windows) produces no observable side effect. +fn persist_watch(owner: Address, params: &ConditionalOrderParams) -> Result<(), HostError> { + let encoded = params.abi_encode(); + let params_hash = keccak256(&encoded); + let key = watch_key(&owner, ¶ms_hash); + local_store::set(&key, &encoded)?; + logging::log(logging::Level::Info, &format!("indexed {key}")); + Ok(()) +} + +// ---- BLEU-827: poll path ---- + +/// Iterate every persisted watch, skip the ones gated by a future +/// `next_block:` / `next_epoch:` entry, and dispatch the ready ones via +/// `eth_call`. +fn poll_all_watches(block: &types::Block) -> Result<(), HostError> { + let now_epoch_s = block.timestamp / 1000; + let keys = local_store::list_keys("watch:")?; + for key in keys { + let Some((owner_hex, hash_hex)) = parse_watch_key(&key) else { + continue; + }; + if !is_ready(owner_hex, hash_hex, block.number, now_epoch_s)? { + continue; + } + let Some(value) = local_store::get(&key)? else { + continue; + }; + let Ok(params) = ConditionalOrderParams::abi_decode(&value) else { + logging::log( + logging::Level::Warn, + &format!("watch {key} carried unparseable params; skipping"), + ); + continue; + }; + let Ok(owner) = owner_hex.parse::
() else { + continue; + }; + let outcome = poll_one(block.chain_id, &owner, ¶ms); + logging::log( + logging::Level::Info, + &format!("poll {key} -> {}", outcome_label(&outcome)), + ); + match outcome { + PollOutcome::Ready { order, signature } => { + submit_ready(block.chain_id, owner, &order, signature, &key, now_epoch_s)?; + } + non_ready => { + apply_watch_update(outcome_to_update(&non_ready), &key)?; + } + } + } + Ok(()) +} + +fn poll_one(chain_id: u64, owner: &Address, params: &ConditionalOrderParams) -> PollOutcome { + let call = abi::getTradeableOrderWithSignatureCall { + owner: *owner, + params: abi::Params { + handler: params.handler, + salt: params.salt, + staticInput: params.staticInput.clone(), + }, + offchainInput: Bytes::new(), + proof: Vec::new(), + }; + let params_json = eth_call_params(&COMPOSABLE_COW, &call.abi_encode()); + match chain::request(chain_id, "eth_call", ¶ms_json) { + Ok(result_json) => parse_eth_call_result(&result_json) + .and_then(|bytes| decode_return(&bytes)) + .unwrap_or(PollOutcome::TryNextBlock), + Err(err) => { + // The host's chain backend currently stuffs the formatted RPC + // error into `message` with `data: None`; once it forwards the + // structured `error.data` from alloy's `RpcError::ErrorResp`, + // those bytes feed into `decode_revert` here. Until then, the + // `data` branch is unreachable on real traffic and the safe + // default is to retry on the next block. + if let Some(data) = err.data.as_deref() + && let Some(outcome) = decode_revert_hex(data) + { + return outcome; + } + logging::log( + logging::Level::Warn, + &format!( + "eth_call failed ({}); defaulting to TryNextBlock", + err.message + ), + ); + PollOutcome::TryNextBlock + } + } +} + +/// Decode a successful `getTradeableOrderWithSignature` return into +/// `Ready { order, signature }`. The wire format is `abi.encode(order, +/// signature)` — the canonical Solidity return tuple — so the two-tuple +/// parameter decode lines up. +fn decode_return(data: &[u8]) -> Option { + let (order, signature) = <(GPv2OrderData, Bytes)>::abi_decode_params(data).ok()?; + Some(PollOutcome::Ready { + order: Box::new(order), + signature, + }) +} + +/// Decode a revert payload (selector + abi-encoded args) into a +/// `PollOutcome`. `None` when the selector is not one of the five +/// `IConditionalOrder` errors — including a bare `Error(string)` +/// require-revert, which the caller treats as TryNextBlock. +fn decode_revert(data: &[u8]) -> Option { + if data.len() < 4 { + return None; + } + let selector: [u8; 4] = data[..4].try_into().ok()?; + let body = &data[4..]; + match selector { + s if s == abi::IConditionalOrder::OrderNotValid::SELECTOR => { + Some(PollOutcome::DontTryAgain) + } + s if s == abi::IConditionalOrder::PollTryNextBlock::SELECTOR => { + Some(PollOutcome::TryNextBlock) + } + s if s == abi::IConditionalOrder::PollTryAtBlock::SELECTOR => { + let decoded = abi::IConditionalOrder::PollTryAtBlock::abi_decode_raw(body).ok()?; + Some(PollOutcome::TryOnBlock(u256_to_u64_saturating( + decoded.blockNumber, + ))) + } + s if s == abi::IConditionalOrder::PollTryAtEpoch::SELECTOR => { + let decoded = abi::IConditionalOrder::PollTryAtEpoch::abi_decode_raw(body).ok()?; + Some(PollOutcome::TryAtEpoch(u256_to_u64_saturating( + decoded.timestamp, + ))) + } + s if s == abi::IConditionalOrder::PollNever::SELECTOR => Some(PollOutcome::DontTryAgain), + _ => None, + } +} + +/// Decode a hex string (with or without `0x` prefix, optionally wrapped in +/// JSON quotes) carrying revert bytes. +fn decode_revert_hex(s: &str) -> Option { + let stripped = s.trim_matches('"'); + let stripped = stripped.strip_prefix("0x").unwrap_or(stripped); + let bytes = alloy_primitives::hex::decode(stripped).ok()?; + decode_revert(&bytes) +} + +fn u256_to_u64_saturating(v: U256) -> u64 { + u64::try_from(v).unwrap_or(u64::MAX) +} + +// ---- BLEU-828: submission path ---- + +/// Convert a freshly-polled `GPv2OrderData` into the `OrderData` shape the +/// orderbook signs against, mapping the on-chain `bytes32` markers for +/// `kind` / `sellTokenBalance` / `buyTokenBalance` to the typed enums. +/// Returns `None` when ComposableCoW emits a marker we don't know — the +/// caller skips the watch instead of submitting a malformed body. +fn gpv2_to_order_data(gpv2: &GPv2OrderData) -> Option { + Some(OrderData { + sell_token: gpv2.sellToken, + buy_token: gpv2.buyToken, + // `from_signed_order_data` already normalises Some(ZERO) -> None, + // but doing it here keeps the EIP-712 hash inputs verbatim if a + // caller bypasses that helper later. + receiver: (gpv2.receiver != Address::ZERO).then_some(gpv2.receiver), + sell_amount: gpv2.sellAmount, + buy_amount: gpv2.buyAmount, + valid_to: gpv2.validTo, + app_data: gpv2.appData, + fee_amount: gpv2.feeAmount, + kind: OrderKind::from_contract_bytes(gpv2.kind)?, + partially_fillable: gpv2.partiallyFillable, + sell_token_balance: SellTokenSource::from_contract_bytes(gpv2.sellTokenBalance)?, + buy_token_balance: BuyTokenDestination::from_contract_bytes(gpv2.buyTokenBalance)?, + }) +} + +/// Assemble the `OrderCreation` body the orderbook expects. +/// +/// `signature` is the EIP-1271 blob `ComposableCoW.getTradeableOrderWith +/// Signature` returns — in orderbook wire form (raw verifier bytes, the +/// orderbook re-prepends `from` before settlement). `from` is the owner +/// that emitted `ConditionalOrderCreated`. +/// +/// `app_data` is left at `EMPTY_APP_DATA_JSON`. If the conditional order +/// pins a non-empty document on IPFS, `from_signed_order_data` rejects the +/// mismatch (`keccak256("{}") != order.app_data`) and we surface the error +/// so the watch is not poisoned — resolving the document is a future +/// concern, not part of this PR. +fn build_order_creation( + order: &GPv2OrderData, + signature: Bytes, + from: Address, +) -> Result { + let order_data = gpv2_to_order_data(order).ok_or(BuildError::UnknownMarker)?; + let signature = Signature::Eip1271(signature.to_vec()); + OrderCreation::from_signed_order_data( + &order_data, + signature, + from, + EMPTY_APP_DATA_JSON.to_string(), + None, + ) + .map_err(BuildError::Cowprotocol) +} + +#[derive(Debug)] +enum BuildError { + /// `GPv2OrderData` carried a marker (`kind`, balance enum) we don't + /// know how to map. + UnknownMarker, + /// `cowprotocol` rejected the body — typically `keccak256(app_data) != + /// order.app_data` (the conditional order pins a non-empty document) + /// or `from == Address::ZERO`. + Cowprotocol(cowprotocol::Error), +} + +impl core::fmt::Display for BuildError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::UnknownMarker => f.write_str("GPv2OrderData carried an unknown enum marker"), + Self::Cowprotocol(e) => write!(f, "{e}"), + } + } +} + +fn submit_ready( + chain_id: u64, + owner: Address, + order: &GPv2OrderData, + signature: Bytes, + watch_key: &str, + now_epoch_s: u64, +) -> Result<(), HostError> { + let creation = match build_order_creation(order, signature, owner) { + Ok(c) => c, + Err(e) => { + logging::log( + logging::Level::Warn, + &format!("twap submit skipped for {owner:#x}: {e}"), + ); + return Ok(()); + } + }; + let body = match serde_json::to_vec(&creation) { + Ok(b) => b, + Err(e) => { + logging::log( + logging::Level::Error, + &format!("OrderCreation JSON encode failed: {e}"), + ); + return Ok(()); + } + }; + match cow_api::submit_order(chain_id, &body) { + Ok(uid) => { + let key = format!("submitted:{uid}"); + // Empty marker — presence of the key is the receipt. BLEU-830 + // may later attach metadata (block, attempt count) but the + // bare flag is enough to suppress double submits. + local_store::set(&key, b"")?; + logging::log(logging::Level::Info, &format!("submitted {key}")); + } + Err(err) => { + apply_submit_retry(&err, watch_key, now_epoch_s)?; + } + } + Ok(()) +} + +// ---- BLEU-829: OrderPostError -> retry action ---- + +/// What the lifecycle layer should do after a failed submission. +/// +/// Mirrors the BLEU-829 retry contract (`TryNextBlock` / `BackoffSeconds(s)` +/// / `Drop`). Today the `Backoff` arm has no producer because the +/// cowprotocol API exposes `retry_hint() -> bool` (no server-supplied +/// delay) — the variant is kept so the dispatcher can grow into it +/// once cowprotocol or the orderbook hands us a hint. +#[derive(Debug, Eq, PartialEq)] +enum RetryAction { + /// Leave the watch in place; it will be polled on the next block. + TryNextBlock, + /// Persist `next_epoch = now + seconds` so the watch is skipped + /// until that timestamp. Reserved for a future producer (the + /// cowprotocol surface today is bool-only, no server delay). + #[allow(dead_code)] + Backoff { seconds: u64 }, + /// Remove the watch entirely — the order will not be retried. + Drop, +} + +/// Try to decode the orderbook's typed error payload from a HostError. +/// +/// The host's `cow_api::submit_order` backend places the orderbook's +/// JSON body in `host-error.data` when the upstream returned a typed +/// `ApiError` (this forwarding is the host-side counterpart to BLEU-829; +/// see PR description for the status of that change). When `data` is +/// missing or fails to parse the function returns `None`, and the +/// dispatcher falls back to the safe default of "retry next block". +fn try_decode_api_error(err: &HostError) -> Option { + let data = err.data.as_deref()?; + serde_json::from_str::(data).ok() +} + +/// Classify a failed submission into the action the lifecycle layer +/// should take. Defaults to `TryNextBlock` whenever the typed payload +/// is absent or unrecognised — the safe choice that lets a flaky +/// orderbook recover without dropping a still-valid order. +fn classify_submit_error(err: &HostError) -> RetryAction { + match try_decode_api_error(err) { + Some(api) if api.retry_hint() => RetryAction::TryNextBlock, + Some(_) => RetryAction::Drop, + None => RetryAction::TryNextBlock, + } +} + +fn apply_submit_retry(err: &HostError, watch_key: &str, now_epoch_s: u64) -> Result<(), HostError> { + let action = classify_submit_error(err); + match action { + RetryAction::TryNextBlock => { + logging::log( + logging::Level::Warn, + &format!("submit retry-next-block ({}): {}", err.code, err.message), + ); + } + RetryAction::Backoff { seconds } => { + let until = now_epoch_s.saturating_add(seconds); + if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) { + local_store::set( + &format!("next_epoch:{owner_hex}:{hash_hex}"), + &until.to_le_bytes(), + )?; + } + logging::log( + logging::Level::Warn, + &format!( + "submit backoff {seconds}s -> next_epoch={until} ({}): {}", + err.code, err.message + ), + ); + } + RetryAction::Drop => { + // Drop the watch, plus any stale gating entries the lifecycle + // layer may have written. + local_store::delete(watch_key)?; + if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) { + let _ = local_store::delete(&format!("next_block:{owner_hex}:{hash_hex}")); + let _ = local_store::delete(&format!("next_epoch:{owner_hex}:{hash_hex}")); + } + logging::log( + logging::Level::Warn, + &format!("submit dropped watch ({}): {}", err.code, err.message), + ); + } + } + Ok(()) +} + +fn outcome_label(o: &PollOutcome) -> &'static str { + match o { + PollOutcome::Ready { .. } => "Ready", + PollOutcome::TryAtEpoch(_) => "TryAtEpoch", + PollOutcome::TryOnBlock(_) => "TryOnBlock", + PollOutcome::TryNextBlock => "TryNextBlock", + PollOutcome::DontTryAgain => "DontTryAgain", + } +} + +// ---- BLEU-830: PollOutcome lifecycle dispatch ---- + +/// What `apply_watch_update` should do for a given outcome. Kept as a +/// data type (rather than running the effects directly) so the decision +/// is host-free testable; `apply_watch_update` is the impure other half. +#[derive(Debug, Eq, PartialEq)] +enum WatchUpdate { + /// Leave the store untouched. Next block re-polls the watch. + NoOp, + /// Write `next_block:` so subsequent polls skip until the given + /// block number is reached. + SetNextBlock(u64), + /// Write `next_epoch:` so subsequent polls skip until the given + /// Unix-seconds timestamp is reached. + SetNextEpoch(u64), + /// Delete the watch and any stale gate keys — TWAP completed, + /// cancelled, or otherwise irrecoverable. + DropWatch, +} + +/// Pure mapping from a non-Ready `PollOutcome` to the lifecycle effect +/// the BLEU-830 contract specifies. `Ready` is handled by the submit +/// path (BLEU-828) and is rejected here so a caller cannot accidentally +/// erase the watch when an order was actually produced. +fn outcome_to_update(outcome: &PollOutcome) -> WatchUpdate { + match outcome { + PollOutcome::Ready { .. } => WatchUpdate::NoOp, // belt-and-braces; caller routes Ready to submit_ready + PollOutcome::TryNextBlock => WatchUpdate::NoOp, + PollOutcome::TryOnBlock(n) => WatchUpdate::SetNextBlock(*n), + PollOutcome::TryAtEpoch(t) => WatchUpdate::SetNextEpoch(*t), + PollOutcome::DontTryAgain => WatchUpdate::DropWatch, + } +} + +fn apply_watch_update(update: WatchUpdate, watch_key: &str) -> Result<(), HostError> { + match update { + WatchUpdate::NoOp => Ok(()), + WatchUpdate::SetNextBlock(n) => { + if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) { + local_store::set( + &format!("next_block:{owner_hex}:{hash_hex}"), + &n.to_le_bytes(), + )?; + } + Ok(()) + } + WatchUpdate::SetNextEpoch(t) => { + if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) { + local_store::set( + &format!("next_epoch:{owner_hex}:{hash_hex}"), + &t.to_le_bytes(), + )?; + } + Ok(()) + } + WatchUpdate::DropWatch => { + local_store::delete(watch_key)?; + // Best-effort: drop any stale gates the previous lifecycle + // step may have written. `delete` is a no-op for absent keys + // already, so the `let _` discards a benign error if the + // underlying store complains. + if let Some((owner_hex, hash_hex)) = parse_watch_key(watch_key) { + let _ = local_store::delete(&format!("next_block:{owner_hex}:{hash_hex}")); + let _ = local_store::delete(&format!("next_epoch:{owner_hex}:{hash_hex}")); + } + logging::log(logging::Level::Info, &format!("dropped watch {watch_key}")); + Ok(()) + } + } +} + +// ---- key conventions shared with BLEU-830 ---- + +fn watch_key(owner: &Address, params_hash: &B256) -> String { + format!("watch:{owner:#x}:{params_hash:#x}") +} + +fn parse_watch_key(key: &str) -> Option<(&str, &str)> { + let rest = key.strip_prefix("watch:")?; + let (owner, hash) = rest.split_once(':')?; + Some((owner, hash)) +} + +fn is_ready( + owner_hex: &str, + hash_hex: &str, + block_number: u64, + epoch_s: u64, +) -> Result { + if let Some(next) = read_u64(&format!("next_block:{owner_hex}:{hash_hex}"))? + && block_number < next + { + return Ok(false); + } + if let Some(next) = read_u64(&format!("next_epoch:{owner_hex}:{hash_hex}"))? + && epoch_s < next + { + return Ok(false); + } + Ok(true) +} + +fn read_u64(key: &str) -> Result, HostError> { + let bytes = local_store::get(key)?; + Ok(bytes + .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok()) + .map(u64::from_le_bytes)) +} + +// ---- eth_call JSON plumbing ---- + +/// Build the JSON params array for `eth_call`: `[{to, data}, "latest"]`. +fn eth_call_params(to: &Address, data: &[u8]) -> String { + let to_hex = format!("{to:#x}"); + let data_hex = alloy_primitives::hex::encode_prefixed(data); + serde_json::json!([{ "to": to_hex, "data": data_hex }, "latest"]).to_string() +} + +/// The host returns the raw JSON-RPC `result` field. For `eth_call` that +/// is a JSON string holding hex like `"0x1234..."`. Strip the JSON quotes, +/// strip the `0x` prefix, and hex-decode. Returns `None` on shape mismatch. +fn parse_eth_call_result(result_json: &str) -> Option> { + let s = serde_json::from_str::(result_json).ok()?; + let hex = s.strip_prefix("0x").unwrap_or(&s); + alloy_primitives::hex::decode(hex).ok() +} + +export!(TwapMonitor); + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{address, b256, hex}; + + fn sample_params() -> ConditionalOrderParams { + ConditionalOrderParams { + handler: address!("ffeeddccbbaa00998877665544332211ffeeddcc"), + salt: b256!("0101010101010101010101010101010101010101010101010101010101010101"), + staticInput: hex!("deadbeef").to_vec().into(), + } + } + + fn sample_order() -> GPv2OrderData { + GPv2OrderData { + sellToken: address!("6810e776880C02933D47DB1b9fc05908e5386b96"), + buyToken: address!("DAE5F1590db13E3B40423B5b5c5fbf175515910b"), + receiver: address!("DeaDbeefdEAdbeefdEadbEEFdeadbeEFdEaDbeeF"), + sellAmount: U256::from(1_000_u64), + buyAmount: U256::from(2_000_u64), + validTo: 1_700_000_000, + appData: B256::repeat_byte(0xaa), + feeAmount: U256::ZERO, + kind: B256::repeat_byte(0xbb), + partiallyFillable: false, + sellTokenBalance: B256::repeat_byte(0xcc), + buyTokenBalance: B256::repeat_byte(0xdd), + } + } + + // BLEU-826 regression — the indexer still produces the original tuple. + #[test] + fn decodes_well_formed_log() { + let owner = address!("00112233445566778899aabbccddeeff00112233"); + let params = sample_params(); + let owner_topic = { + let mut t = vec![0u8; 12]; + t.extend_from_slice(owner.as_slice()); + t + }; + let topics = vec![ + ConditionalOrderCreated::SIGNATURE_HASH.to_vec(), + owner_topic, + ]; + let data = params.abi_encode(); + + let (decoded_owner, decoded_params) = + decode_conditional_order_created(&topics, &data).expect("decode succeeds"); + assert_eq!(decoded_owner, owner); + assert_eq!(decoded_params, params); + } + + #[test] + fn rejects_wrong_topic() { + let topics = vec![ + b256!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").to_vec(), + ]; + assert!(decode_conditional_order_created(&topics, &[]).is_none()); + } + + #[test] + fn rejects_empty_topics() { + assert!(decode_conditional_order_created(&[], &[]).is_none()); + } + + // ---- BLEU-827 ---- + + #[test] + fn decode_return_round_trip() { + let order = sample_order(); + let sig: Bytes = hex!("c0ffeec0ffeec0ffee").to_vec().into(); + let wire = (order.clone(), sig.clone()).abi_encode_params(); + + match decode_return(&wire).expect("decode succeeds") { + PollOutcome::Ready { + order: o, + signature: s, + } => { + assert_eq!(o.sellToken, order.sellToken); + assert_eq!(o.buyAmount, order.buyAmount); + assert_eq!(s, sig); + } + other => panic!("expected Ready, got {other:?}"), + } + } + + #[test] + fn decode_revert_order_not_valid_maps_to_drop() { + let err = abi::IConditionalOrder::OrderNotValid { + reason: "expired".to_string(), + }; + assert!(matches!( + decode_revert(&err.abi_encode()), + Some(PollOutcome::DontTryAgain) + )); + } + + #[test] + fn decode_revert_poll_never_maps_to_drop() { + let err = abi::IConditionalOrder::PollNever { + reason: "cancelled".to_string(), + }; + assert!(matches!( + decode_revert(&err.abi_encode()), + Some(PollOutcome::DontTryAgain) + )); + } + + #[test] + fn decode_revert_try_next_block() { + let err = abi::IConditionalOrder::PollTryNextBlock { + reason: "noop".to_string(), + }; + assert!(matches!( + decode_revert(&err.abi_encode()), + Some(PollOutcome::TryNextBlock) + )); + } + + #[test] + fn decode_revert_try_at_block_carries_number() { + let err = abi::IConditionalOrder::PollTryAtBlock { + blockNumber: U256::from(12_345_678_u64), + reason: "wait".to_string(), + }; + let outcome = decode_revert(&err.abi_encode()).expect("decode succeeds"); + assert!(matches!(outcome, PollOutcome::TryOnBlock(n) if n == 12_345_678)); + } + + #[test] + fn decode_revert_try_at_epoch_carries_timestamp() { + let err = abi::IConditionalOrder::PollTryAtEpoch { + timestamp: U256::from(1_700_000_000_u64), + reason: "soon".to_string(), + }; + let outcome = decode_revert(&err.abi_encode()).expect("decode succeeds"); + assert!(matches!(outcome, PollOutcome::TryAtEpoch(t) if t == 1_700_000_000)); + } + + #[test] + fn decode_revert_unknown_selector_returns_none() { + let mut data = vec![0xde, 0xad, 0xbe, 0xef]; + data.extend_from_slice(&[0u8; 32]); + assert!(decode_revert(&data).is_none()); + } + + #[test] + fn decode_revert_truncated_returns_none() { + assert!(decode_revert(&[0x01, 0x02]).is_none()); + } + + #[test] + fn decode_revert_hex_strips_prefix_and_quotes() { + let err = abi::IConditionalOrder::PollTryAtBlock { + blockNumber: U256::from(42_u64), + reason: "x".to_string(), + }; + let payload = alloy_primitives::hex::encode_prefixed(err.abi_encode()); + let quoted = format!("\"{payload}\""); + assert!(matches!( + decode_revert_hex("ed), + Some(PollOutcome::TryOnBlock(42)) + )); + } + + #[test] + fn u256_overflow_saturates() { + assert_eq!(u256_to_u64_saturating(U256::MAX), u64::MAX); + assert_eq!(u256_to_u64_saturating(U256::from(42_u64)), 42); + } + + #[test] + fn parse_eth_call_result_decodes_hex_string() { + assert_eq!( + parse_eth_call_result(r#""0xdeadbeef""#), + Some(vec![0xde, 0xad, 0xbe, 0xef]) + ); + } + + #[test] + fn parse_eth_call_result_handles_empty_hex() { + assert_eq!(parse_eth_call_result(r#""0x""#), Some(vec![])); + } + + #[test] + fn eth_call_params_shape() { + let to = address!("fdaFc9d1902f4e0b84f65F49f244b32b31013b74"); + let data = hex!("aabbcc").to_vec(); + let p = eth_call_params(&to, &data); + let parsed: serde_json::Value = serde_json::from_str(&p).unwrap(); + assert_eq!( + parsed[0]["to"], + "0xfdafc9d1902f4e0b84f65f49f244b32b31013b74" + ); + assert_eq!(parsed[0]["data"], "0xaabbcc"); + assert_eq!(parsed[1], "latest"); + } + + #[test] + fn watch_key_round_trips_via_parse() { + let owner = address!("00112233445566778899aabbccddeeff00112233"); + let hash = b256!("0202020202020202020202020202020202020202020202020202020202020202"); + let key = watch_key(&owner, &hash); + let (o, h) = parse_watch_key(&key).expect("parse"); + assert_eq!(o.parse::
().unwrap(), owner); + assert_eq!(h.parse::().unwrap(), hash); + } + + // ---- BLEU-828: submission shape ---- + + fn submittable_order() -> GPv2OrderData { + GPv2OrderData { + sellToken: address!("6810e776880C02933D47DB1b9fc05908e5386b96"), + buyToken: address!("DAE5F1590db13E3B40423B5b5c5fbf175515910b"), + receiver: Address::ZERO, + sellAmount: U256::from(1_000_000_u64), + buyAmount: U256::from(999_u64), + validTo: 0xffff_ffff, + appData: cowprotocol::EMPTY_APP_DATA_HASH, + feeAmount: U256::ZERO, + kind: OrderKind::SELL, + partiallyFillable: false, + sellTokenBalance: SellTokenSource::ERC20, + buyTokenBalance: BuyTokenDestination::ERC20, + } + } + + #[test] + fn gpv2_to_order_data_normalises_zero_receiver_to_none() { + let mut g = submittable_order(); + g.receiver = Address::ZERO; + let od = gpv2_to_order_data(&g).expect("known markers"); + assert_eq!(od.receiver, None); + } + + #[test] + fn gpv2_to_order_data_preserves_non_zero_receiver() { + let mut g = submittable_order(); + g.receiver = address!("DeaDbeefdEAdbeefdEadbEEFdeadbeEFdEaDbeeF"); + let od = gpv2_to_order_data(&g).expect("known markers"); + assert_eq!(od.receiver, Some(g.receiver)); + } + + #[test] + fn gpv2_to_order_data_unknown_kind_returns_none() { + let mut g = submittable_order(); + g.kind = B256::repeat_byte(0x42); + assert!(gpv2_to_order_data(&g).is_none()); + } + + #[test] + fn gpv2_to_order_data_unknown_sell_token_balance_returns_none() { + let mut g = submittable_order(); + g.sellTokenBalance = B256::repeat_byte(0x99); + assert!(gpv2_to_order_data(&g).is_none()); + } + + #[test] + fn build_order_creation_succeeds_with_empty_app_data() { + let owner = address!("00112233445566778899aabbccddeeff00112233"); + let sig: Bytes = hex!("c0ffeec0ffeec0ffee").to_vec().into(); + let creation = + build_order_creation(&submittable_order(), sig.clone(), owner).expect("build succeeds"); + assert_eq!(creation.from, owner); + assert_eq!(creation.signing_scheme, cowprotocol::SigningScheme::Eip1271); + assert_eq!(creation.signature.to_bytes(), sig.to_vec()); + assert_eq!(creation.app_data, cowprotocol::EMPTY_APP_DATA_JSON); + assert_eq!(creation.app_data_hash, cowprotocol::EMPTY_APP_DATA_HASH); + // serde round-trip — the submit path serialises this exact value. + let body = serde_json::to_vec(&creation).expect("json encode"); + let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(parsed["signingScheme"], "eip1271"); + assert_eq!(parsed["from"], format!("{owner:#x}")); + } + + #[test] + fn build_order_creation_rejects_non_empty_app_data() { + // ComposableCoW orders that pin a real document on IPFS get + // skipped: we only carry `EMPTY_APP_DATA_JSON` in this PR. + let mut order = submittable_order(); + order.appData = B256::repeat_byte(0xee); + let owner = address!("00112233445566778899aabbccddeeff00112233"); + let err = build_order_creation(&order, Bytes::new(), owner).unwrap_err(); + assert!(matches!(err, BuildError::Cowprotocol(_))); + } + + #[test] + fn build_order_creation_rejects_zero_from() { + let err = + build_order_creation(&submittable_order(), Bytes::new(), Address::ZERO).unwrap_err(); + assert!(matches!(err, BuildError::Cowprotocol(_))); + } + + // ---- BLEU-829: submit-error classification ---- + + fn host_error_with_api(error_type: &str) -> HostError { + let body = serde_json::json!({ + "errorType": error_type, + "description": "test", + }); + HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Denied, + code: 400, + message: format!("{error_type}: test"), + data: Some(body.to_string()), + } + } + + #[test] + fn classify_retriable_kind_returns_try_next_block() { + // InsufficientFee / TooManyLimitOrders / PriceExceedsMarketPrice + // are the three kinds cowprotocol::OrderPostErrorKind flags + // retriable today. + for kind in [ + "InsufficientFee", + "TooManyLimitOrders", + "PriceExceedsMarketPrice", + ] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::TryNextBlock, + "{kind} should be retriable", + ); + } + } + + #[test] + fn classify_permanent_kind_returns_drop() { + for kind in [ + "InvalidSignature", + "WrongOwner", + "DuplicateOrder", + "UnsupportedToken", + "InvalidAppData", + ] { + assert_eq!( + classify_submit_error(&host_error_with_api(kind)), + RetryAction::Drop, + "{kind} should be permanent", + ); + } + } + + #[test] + fn classify_unknown_kind_returns_drop() { + // `Unknown(_)` is non-retriable per cowprotocol's classification + // — the orderbook rejected the order with a string we don't + // recognise, so retrying as-is is unlikely to help. + assert_eq!( + classify_submit_error(&host_error_with_api("NewlyMintedErrorType")), + RetryAction::Drop, + ); + } + + #[test] + fn classify_missing_data_defaults_to_try_next_block() { + // Until the host backend forwards the orderbook JSON into + // host-error.data, we have no payload to decode. The safe + // default is to retry rather than poison a still-valid watch. + let err = HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Internal, + code: 0, + message: "network reset".into(), + data: None, + }; + assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock); + } + + #[test] + fn classify_malformed_data_defaults_to_try_next_block() { + let err = HostError { + domain: "cow-api".into(), + kind: nexum::host::types::HostErrorKind::Denied, + code: 502, + message: "bad gateway".into(), + data: Some("upstream HTML".into()), + }; + assert_eq!(classify_submit_error(&err), RetryAction::TryNextBlock); + } + + // ---- BLEU-830: PollOutcome -> lifecycle effect ---- + + #[test] + fn outcome_try_next_block_is_no_op() { + assert_eq!( + outcome_to_update(&PollOutcome::TryNextBlock), + WatchUpdate::NoOp, + ); + } + + #[test] + fn outcome_try_on_block_sets_next_block_gate() { + assert_eq!( + outcome_to_update(&PollOutcome::TryOnBlock(12_345)), + WatchUpdate::SetNextBlock(12_345), + ); + } + + #[test] + fn outcome_try_at_epoch_sets_next_epoch_gate() { + assert_eq!( + outcome_to_update(&PollOutcome::TryAtEpoch(1_700_000_000)), + WatchUpdate::SetNextEpoch(1_700_000_000), + ); + } + + #[test] + fn outcome_dont_try_again_drops_watch() { + assert_eq!( + outcome_to_update(&PollOutcome::DontTryAgain), + WatchUpdate::DropWatch, + ); + } + + #[test] + fn outcome_ready_is_handled_by_submit_path_not_lifecycle() { + // Ready never reaches outcome_to_update in poll_all_watches (the + // match routes it to submit_ready). The mapping is a safety net: + // if a future refactor accidentally pipes Ready through here, the + // watch must NOT be erased — submit_ready owns the post-submit + // book-keeping. + let order = Box::new(submittable_order()); + let outcome = PollOutcome::Ready { + order, + signature: Bytes::new(), + }; + assert_eq!(outcome_to_update(&outcome), WatchUpdate::NoOp); + } +}