diff --git a/Cargo.lock b/Cargo.lock index 8cc63002e..c9ee02103 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2659,6 +2659,7 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ + "bytemuck", "cfg-if", "crunchy", "num-traits", @@ -4130,6 +4131,7 @@ dependencies = [ "nodedb-fts", "nodedb-graph", "nodedb-mem", + "nodedb-physical", "nodedb-query", "nodedb-raft", "nodedb-spatial", @@ -4296,6 +4298,7 @@ dependencies = [ "nodedb-array", "nodedb-bridge", "nodedb-cluster", + "nodedb-physical", "nodedb-test-support", "nodedb-types", "serde_json", @@ -4413,6 +4416,20 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "nodedb-physical" +version = "0.2.1" +dependencies = [ + "nodedb-array", + "nodedb-graph", + "nodedb-query", + "nodedb-sql", + "nodedb-types", + "serde", + "thiserror 2.0.18", + "zerompk", +] + [[package]] name = "nodedb-query" version = "0.2.1" @@ -4503,6 +4520,7 @@ dependencies = [ "nodedb-bridge", "nodedb-cluster", "nodedb-mem", + "nodedb-physical", "nodedb-types", "nodedb-wal", "serde_json", @@ -4546,7 +4564,9 @@ name = "nodedb-vector" version = "0.2.1" dependencies = [ "arc-swap", + "bytemuck", "crc32c", + "half", "libc", "memmap2", "nodedb-codec", diff --git a/Cargo.toml b/Cargo.toml index aaac9fc72..03560520c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "nodedb-columnar", "nodedb-array", "nodedb-sql", + "nodedb-physical", "nodedb-test-support", "nodedb-cluster-tests", "nodedb-client-tests", @@ -58,6 +59,7 @@ nodedb-strict = { path = "nodedb-strict", version = "0.2" } nodedb-columnar = { path = "nodedb-columnar", version = "0.2" } nodedb-array = { path = "nodedb-array", version = "0.2" } nodedb-sql = { path = "nodedb-sql", version = "0.2" } +nodedb-physical = { path = "nodedb-physical", version = "0.2" } nodedb-test-support = { path = "nodedb-test-support" } # Async runtimes @@ -176,6 +178,9 @@ arc-swap = "1" # Safe transmutation for f32/u8 slice casts bytemuck = { version = "1", features = ["derive"] } +# IEEE 754 half-precision (f16) and brain-float (bf16) types +half = { version = "2", features = ["bytemuck"] } + # WebSocket tokio-tungstenite = "0.29" diff --git a/nodedb-client/src/native/client.rs b/nodedb-client/src/native/client.rs deleted file mode 100644 index 5c89b48f0..000000000 --- a/nodedb-client/src/native/client.rs +++ /dev/null @@ -1,615 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -//! High-level native protocol client implementing the `NodeDb` trait. -//! -//! Wraps a connection pool and translates trait calls into native protocol -//! opcodes. Also exposes SQL/DDL methods not covered by the trait. - -use std::collections::HashMap; - -use async_trait::async_trait; -use sonic_rs::JsonValueTrait; - -use nodedb_types::document::Document; -use nodedb_types::error::{ErrorDetails, NodeDbError, NodeDbResult}; -use nodedb_types::filter::{EdgeFilter, MetadataFilter}; -use nodedb_types::graph::GraphStats; -use nodedb_types::id::{EdgeId, NodeId}; -use nodedb_types::protocol::{OpCode, TextFields}; -use nodedb_types::result::{QueryResult, SearchResult, SubGraph}; -use nodedb_types::value::Value; - -use nodedb_types::protocol::Limits; - -use super::pool::{Pool, PoolConfig}; -use super::response_parse::{json_to_value, parse_search_results, parse_subgraph_response}; -use crate::traits::NodeDb; - -/// Native protocol client for NodeDB. -/// -/// Connects via the binary MessagePack protocol. Supports all operations: -/// SQL, DDL, direct Data Plane ops, transactions, session parameters. -pub struct NativeClient { - pool: Pool, -} - -impl NativeClient { - /// Create a client with the given pool configuration. - pub fn new(config: PoolConfig) -> Self { - Self { - pool: Pool::new(config), - } - } - - /// Connect to a NodeDB server with default settings. - pub fn connect(addr: &str) -> Self { - Self::new(PoolConfig { - addr: addr.to_string(), - ..Default::default() - }) - } - - /// Execute a SQL query and return structured results. - /// - /// Retries once with a fresh connection on I/O failure. - pub async fn query(&self, sql: &str) -> NodeDbResult { - let mut conn = self.pool.acquire().await?; - match conn.execute_sql(sql).await { - Ok(r) => Ok(r), - Err(e) if is_connection_error(&e) => { - drop(conn); - let mut conn = self.pool.acquire().await?; - conn.execute_sql(sql).await - } - Err(e) => Err(e), - } - } - - /// Execute a DDL command. - pub async fn ddl(&self, sql: &str) -> NodeDbResult { - let mut conn = self.pool.acquire().await?; - match conn.execute_ddl(sql).await { - Ok(r) => Ok(r), - Err(e) if is_connection_error(&e) => { - drop(conn); - let mut conn = self.pool.acquire().await?; - conn.execute_ddl(sql).await - } - Err(e) => Err(e), - } - } - - /// Begin a transaction. - pub async fn begin(&self) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.begin().await - } - - /// Commit the current transaction. - pub async fn commit(&self) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.commit().await - } - - /// Rollback the current transaction. - pub async fn rollback(&self) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.rollback().await - } - - /// Set a session parameter. - pub async fn set_parameter(&self, key: &str, value: &str) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.set_parameter(key, value).await - } - - /// Show a session parameter. - pub async fn show_parameter(&self, key: &str) -> NodeDbResult { - let mut conn = self.pool.acquire().await?; - conn.show_parameter(key).await - } - - /// Ping the server. - pub async fn ping(&self) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.ping().await - } -} - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl NodeDb for NativeClient { - fn proto_version(&self) -> u16 { - self.pool - .negotiated_meta() - .map(|m| m.proto_version) - .unwrap_or(0) - } - - fn capabilities(&self) -> u64 { - self.pool - .negotiated_meta() - .map(|m| m.capabilities) - .unwrap_or(0) - } - - fn server_version(&self) -> String { - self.pool - .negotiated_meta() - .map(|m| m.server_version) - .unwrap_or_default() - } - - fn limits(&self) -> Limits { - self.pool - .negotiated_meta() - .map(|m| m.limits) - .unwrap_or_default() - } - - async fn vector_search( - &self, - collection: &str, - query: &[f32], - k: usize, - filter: Option<&MetadataFilter>, - ) -> NodeDbResult> { - let mut conn = self.pool.acquire().await?; - let resp = conn - .send( - OpCode::VectorSearch, - build_vector_search_request(collection, query, k, filter), - ) - .await?; - parse_search_results(&resp) - } - - async fn vector_insert( - &self, - collection: &str, - id: &str, - embedding: &[f32], - metadata: Option, - ) -> NodeDbResult<()> { - // Serialize metadata up front. A serialization failure must - // propagate — the prior `unwrap_or_else(|_| "{}")` silently - // replaced caller-supplied metadata with an empty object, which - // is exactly the silent-drop pattern this client guards against - // on every other seam (filter bytes, bind params). - let meta_json = match metadata { - Some(d) => { - let obj: HashMap = d.fields; - sonic_rs::to_string(&obj).map_err(|e| { - NodeDbError::serialization("json", format!("vector_insert metadata: {e}")) - })? - } - None => "{}".to_string(), - }; - let sql = format!( - "INSERT INTO {} (id, embedding, metadata) VALUES ({}, {}, {})", - sql_quote_identifier(collection), - sql_quote_string_literal(id), - format_f32_array(embedding), - sql_quote_string_literal(&meta_json), - ); - let mut conn = self.pool.acquire().await?; - conn.execute_sql(&sql).await?; - Ok(()) - } - - async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()> { - let sql = format!( - "DELETE FROM {} WHERE id = {}", - sql_quote_identifier(collection), - sql_quote_string_literal(id), - ); - let mut conn = self.pool.acquire().await?; - conn.execute_sql(&sql).await?; - Ok(()) - } - - async fn graph_traverse( - &self, - collection: &str, - start: &NodeId, - depth: u8, - edge_filter: Option<&EdgeFilter>, - ) -> NodeDbResult { - let mut conn = self.pool.acquire().await?; - let resp = conn - .send( - OpCode::GraphHop, - TextFields { - collection: Some(collection.to_string()), - start_node: Some(start.as_str().to_string()), - depth: Some(depth as u32), - edge_label: edge_filter.and_then(|f| f.labels.first().cloned()), - ..Default::default() - }, - ) - .await?; - parse_subgraph_response(&resp) - } - - async fn graph_insert_edge( - &self, - collection: &str, - from: &NodeId, - to: &NodeId, - edge_type: &str, - properties: Option, - ) -> NodeDbResult { - let props_json = match properties { - Some(d) => Some(serde_json::to_value(d.fields).map_err(|e| { - NodeDbError::serialization("json", format!("edge properties: {e}")) - })?), - None => None, - }; - let mut conn = self.pool.acquire().await?; - conn.send( - OpCode::EdgePut, - TextFields { - collection: Some(collection.to_string()), - from_node: Some(from.as_str().to_string()), - to_node: Some(to.as_str().to_string()), - edge_type: Some(edge_type.to_string()), - properties: props_json, - ..Default::default() - }, - ) - .await?; - EdgeId::try_first(from.clone(), to.clone(), edge_type) - .map_err(|e| NodeDbError::storage(format!("invalid edge label: {e}"))) - } - - async fn graph_delete_edge(&self, collection: &str, edge_id: &EdgeId) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.send( - OpCode::EdgeDelete, - TextFields { - collection: Some(collection.to_string()), - from_node: Some(edge_id.src.as_str().to_string()), - to_node: Some(edge_id.dst.as_str().to_string()), - edge_type: Some(edge_id.label.clone()), - ..Default::default() - }, - ) - .await?; - Ok(()) - } - - async fn graph_stats( - &self, - collection: Option<&str>, - as_of: Option, - ) -> NodeDbResult> { - // Route through the SQL/DSL path — `SHOW GRAPH STATS` is handled - // by the Control Plane's graph-ops dispatcher and returns a compact - // row set: (collection, node_count, edge_count, distinct_label_count, - // labels). `execute_sql` with empty params uses the simple-query wire - // path so DDL-adjacent statements like `SHOW GRAPH STATS` work. - let coll_clause = match collection { - Some(name) => format!(" '{}'", name.replace('\'', "''")), - None => String::new(), - }; - let as_of_clause = match as_of { - Some(ms) => format!(" AS OF SYSTEM TIME {ms}"), - None => String::new(), - }; - let sql = format!("SHOW GRAPH STATS{coll_clause}{as_of_clause}"); - let result = self.execute_sql(&sql, &[]).await?; - - let mut out = Vec::with_capacity(result.rows.len()); - for row in result.rows { - let coll_name = row - .first() - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let node_count = row.get(1).and_then(|v| v.as_i64()).unwrap_or(0) as u64; - let edge_count = row.get(2).and_then(|v| v.as_i64()).unwrap_or(0) as u64; - let distinct_label_count = row.get(3).and_then(|v| v.as_i64()).unwrap_or(0) as u64; - let labels: Vec<(String, u64)> = row - .get(4) - .and_then(|v| v.as_str()) - .and_then(|s| { - sonic_rs::from_str::>(s) - .ok() - .map(|arr| { - arr.into_iter() - .filter_map(|obj| { - let label = obj["label"].as_str()?.to_string(); - let count = obj["count"].as_u64()?; - Some((label, count)) - }) - .collect() - }) - }) - .unwrap_or_default(); - - out.push(GraphStats { - collection: coll_name, - node_count, - edge_count, - distinct_label_count, - labels, - }); - } - Ok(out) - } - - async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult> { - let mut conn = self.pool.acquire().await?; - let resp = conn - .send( - OpCode::PointGet, - TextFields { - collection: Some(collection.to_string()), - document_id: Some(id.to_string()), - ..Default::default() - }, - ) - .await?; - - let rows = resp.rows.unwrap_or_default(); - if rows.is_empty() { - return Ok(None); - } - - let json_text = rows[0].first().and_then(|v| v.as_str()).unwrap_or("{}"); - let mut doc = Document::new(id); - if let Ok(obj) = sonic_rs::from_str::>(json_text) { - for (k, v) in obj { - doc.set(&k, json_to_value(v)); - } - } - Ok(Some(doc)) - } - - async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()> { - let data = sonic_rs::to_vec(&doc.fields) - .map_err(|e| NodeDbError::serialization("json", format!("doc serialize: {e}")))?; - let mut conn = self.pool.acquire().await?; - conn.send( - OpCode::PointPut, - TextFields { - collection: Some(collection.to_string()), - document_id: Some(doc.id.clone()), - data: Some(data), - ..Default::default() - }, - ) - .await?; - Ok(()) - } - - async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()> { - let mut conn = self.pool.acquire().await?; - conn.send( - OpCode::PointDelete, - TextFields { - collection: Some(collection.to_string()), - document_id: Some(id.to_string()), - ..Default::default() - }, - ) - .await?; - Ok(()) - } - - async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult { - // Bound parameters travel through `TextFields::sql_params` as a - // zerompk-MessagePack `Vec`. The server-side `handle_sql` - // decodes them and inlines each value as a SQL literal before - // planning, so `$1`, `$2`, … placeholders resolve to the - // caller's values without round-tripping through a brittle - // client-side rewrite. Retries once on a connection-level - // failure with a fresh pool acquisition, matching `query()`. - let mut conn = self.pool.acquire().await?; - match conn.execute_sql_with_params(query, params).await { - Ok(r) => Ok(r), - Err(e) if is_connection_error(&e) => { - drop(conn); - let mut conn = self.pool.acquire().await?; - conn.execute_sql_with_params(query, params).await - } - Err(e) => Err(e), - } - } -} - -/// Build the `TextFields` payload for an `OpCode::VectorSearch` request. -/// -/// The native protocol reserves wire byte 68 for the optional -/// `TextFields::filters: Option>` field. When the trait caller -/// passes a non-`None` `MetadataFilter`, the predicate is serialized -/// here so it travels alongside the SQL/DSL request rather than being -/// dropped at the client. -/// -/// Wire-format note: the inline doc on `TextFields::filters` calls for -/// MessagePack. Until the server-side decoder is wired (the dispatch -/// path currently constructs plans with `filters: Vec::new()`), the -/// client serializes via sonic_rs JSON. The server-side fix will switch -/// both sides to a single agreed encoding; for now the bytes are -/// observable as non-empty, which is what the trait contract requires. -fn build_vector_search_request( - collection: &str, - query: &[f32], - k: usize, - filter: Option<&MetadataFilter>, -) -> TextFields { - let filters_bytes = filter.and_then(|f| { - // Filter encoding is best-effort at this layer: a serialization - // failure must not block the request, but it must not silently - // produce an empty `filters` field either (that would re-create - // the silent-drop pattern this fix is closing). - match sonic_rs::to_vec(f) { - Ok(b) => Some(b), - Err(e) => { - tracing::warn!(error = %e, "failed to serialize metadata filter for native request"); - None - } - } - }); - TextFields { - collection: Some(collection.to_string()), - query_vector: Some(query.to_vec()), - top_k: Some(k as u32), - filters: filters_bytes, - ..Default::default() - } -} - -// ─── Internal helpers ────────────────────────────────────────────── - -fn format_f32_array(arr: &[f32]) -> String { - let inner: Vec = arr.iter().map(|v| format!("{v}")).collect(); - format!("ARRAY[{}]", inner.join(",")) -} - -/// Quote a SQL identifier (collection / column name) by doubling any -/// internal double-quotes and wrapping the result in double-quotes — -/// the SQL standard rule that PostgreSQL applies under -/// `standard_conforming_strings=on`. Mirrors the always-quote variant -/// in `remote_parse::quote_identifier`; kept here to avoid pulling the -/// `remote` feature into the `native` client. -fn sql_quote_identifier(name: &str) -> String { - let escaped = name.replace('"', "\"\""); - format!("\"{escaped}\"") -} - -/// Render a `&str` as a SQL string literal: single-quote-doubled and -/// wrapped in single quotes. Matches `standard_conforming_strings=on` -/// behavior (PG 9.1+ default) which is the only mode the server runs in. -/// Centralizes the escape so call sites can't drift into raw `format!`s -/// without going through it. -fn sql_quote_string_literal(s: &str) -> String { - let escaped = s.replace('\'', "''"); - format!("'{escaped}'") -} - -/// Check if an error is a connection-level failure (worth retrying). -fn is_connection_error(e: &NodeDbError) -> bool { - matches!( - e.details(), - ErrorDetails::SyncConnectionFailed | ErrorDetails::Storage { .. } - ) -} - -#[cfg(test)] -mod tests { - use super::*; - - // NodeDb trait-contract enforcement on the native client. - // - // Symmetric to the remote-side guards in `nodedb-client/src/remote/sql.rs`. - // A request envelope that omits caller-supplied filter / params - // bytes is the silent-drop pattern these tests guard against — the - // server answers without the caller's predicate, returning data - // from the wrong scope. The tests pin the spec at the request- - // builder seam so the envelope carries what the trait promised. - - #[test] - fn vector_search_request_without_filter_omits_filter_bytes() { - // No filter → TextFields.filters stays None. - let req = build_vector_search_request("docs", &[0.1, 0.2], 5, None); - assert_eq!(req.collection.as_deref(), Some("docs")); - assert_eq!(req.query_vector.as_deref(), Some(&[0.1f32, 0.2][..])); - assert_eq!(req.top_k, Some(5)); - assert!( - req.filters.is_none(), - "no-filter case must leave TextFields::filters empty" - ); - } - - #[test] - fn vector_search_request_serializes_metadata_filter() { - // Spec: a non-None filter is serialized into TextFields::filters - // (MessagePack-encoded predicate bytes), not silently dropped. - // The native protocol reserves wire byte 68 for this field; - // the request builder must populate it whenever the trait - // caller passes a non-None filter. - let filter = MetadataFilter::eq("category", Value::String("ai".into())); - let req = build_vector_search_request("docs", &[0.1], 3, Some(&filter)); - assert!( - req.filters.is_some(), - "non-None filter must be serialized into TextFields::filters \ - rather than dropped before reaching the wire" - ); - let bytes = req.filters.expect("filters bytes recorded"); - assert!( - !bytes.is_empty(), - "serialized filter bytes must not be empty" - ); - } - - #[test] - fn execute_sql_encodes_params_into_sql_params_field() { - // Spec: non-empty `params` are encoded as a zerompk-MessagePack - // `Vec` and ride on `TextFields::sql_params`. The - // round-trip below isn't going through a server; it asserts the - // client-side encoding step the trait impl performs is - // reversible by the server-side decoder (same crate, same - // codec). A silent re-encoding into JSON or a lossy - // `Vec` would lose the variant tag and re-create the - // silent-wrong pattern the trait contract is built to prevent. - let params = vec![ - Value::Null, - Value::Bool(true), - Value::Integer(42), - Value::String("alice".into()), - ]; - let bytes = zerompk::to_msgpack_vec(¶ms).expect("encode params"); - let decoded: Vec = - zerompk::from_msgpack(&bytes).expect("decode round-trips on same codec"); - assert_eq!(decoded.len(), 4); - assert!(matches!(decoded[0], Value::Null)); - assert!(matches!(decoded[1], Value::Bool(true))); - assert!(matches!(decoded[2], Value::Integer(42))); - match &decoded[3] { - Value::String(s) => assert_eq!(s, "alice"), - other => panic!("expected Value::String('alice'), got {other:?}"), - } - } - - #[test] - fn format_f32_array_works() { - let arr = [0.1f32, 0.2, 0.3]; - let s = format_f32_array(&arr); - assert!(s.starts_with("ARRAY[")); - assert!(s.contains("0.1")); - assert!(s.ends_with(']')); - } - - #[test] - fn sql_quote_identifier_wraps_and_escapes_double_quotes() { - assert_eq!(sql_quote_identifier("foo"), "\"foo\""); - // Embedded `"` must be doubled per the SQL identifier-escape rule. - assert_eq!(sql_quote_identifier("a\"b"), "\"a\"\"b\""); - } - - #[test] - fn sql_quote_string_literal_escapes_single_quotes() { - assert_eq!(sql_quote_string_literal("plain"), "'plain'"); - // The PG standard rule under `standard_conforming_strings=on`: - // double every embedded `'`. A `O'Reilly` literal that lost its - // escape would close the SQL string early and inject the rest. - assert_eq!(sql_quote_string_literal("O'Reilly"), "'O''Reilly'"); - assert_eq!( - sql_quote_string_literal("'; DROP TABLE x; --"), - "'''; DROP TABLE x; --'" - ); - } - - #[test] - fn sql_quote_string_literal_passes_through_json() { - // The metadata path renders sonic_rs JSON and then quotes it as - // a SQL string. JSON already escapes its own `"` and `\`, so - // only the outer `'` needs SQL escaping. Verify the helper - // doesn't touch JSON-internal quoting. - let json = r#"{"name":"O'Reilly","ok":true}"#; - let quoted = sql_quote_string_literal(json); - // The single quote in `O'Reilly` is doubled; the JSON `"` is left alone. - assert_eq!(quoted, "'{\"name\":\"O''Reilly\",\"ok\":true}'"); - } -} diff --git a/nodedb-client/src/native/client/document.rs b/nodedb-client/src/native/client/document.rs index 75c164e8c..49ec69541 100644 --- a/nodedb-client/src/native/client/document.rs +++ b/nodedb-client/src/native/client/document.rs @@ -8,8 +8,8 @@ use nodedb_types::document::Document; use nodedb_types::error::{NodeDbError, NodeDbResult}; use nodedb_types::protocol::{OpCode, TextFields}; -use super::super::response_parse::json_to_value; use super::core::NativeClient; +use nodedb_types::conversion::json_to_value; impl NativeClient { pub(super) async fn document_get_impl( diff --git a/nodedb-client/src/native/response_parse.rs b/nodedb-client/src/native/response_parse.rs index 00a62cb49..b2bc894bc 100644 --- a/nodedb-client/src/native/response_parse.rs +++ b/nodedb-client/src/native/response_parse.rs @@ -47,9 +47,6 @@ fn parse_single_search_result(v: &serde_json::Value) -> Option { }) } -// Re-export for use by client.rs -pub(crate) use nodedb_types::conversion::json_to_value; - /// Parse a graph traversal response into a SubGraph. pub(crate) fn parse_subgraph_response( resp: &nodedb_types::protocol::NativeResponse, diff --git a/nodedb-client/src/remote/client.rs b/nodedb-client/src/remote/client.rs deleted file mode 100644 index cf8f03d99..000000000 --- a/nodedb-client/src/remote/client.rs +++ /dev/null @@ -1,767 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -//! `NodeDbRemote` connection lifecycle and `NodeDb` trait impl. -//! -//! Connection methods (`connect`, `query_raw`, `execute_raw`) live here. -//! The single `impl NodeDb for NodeDbRemote` block is the only legal -//! place for the trait methods (Rust forbids splitting a trait impl -//! across files); each method is a thin shim that calls the helpers in -//! `super::sql` (SQL/param translation) and `super::parse` (JSON decode). - -use std::collections::HashMap; -use std::sync::Arc; - -use async_trait::async_trait; -use sonic_rs::JsonValueTrait; -use tokio::sync::Mutex; -use tokio_postgres::{Client, NoTls}; - -use nodedb_types::document::Document; -use nodedb_types::dropped_collection::DroppedCollection; -use nodedb_types::error::{NodeDbError, NodeDbResult}; -use nodedb_types::filter::{EdgeFilter, MetadataFilter}; -use nodedb_types::graph::GraphStats; -use nodedb_types::id::{EdgeId, NodeId}; -use nodedb_types::result::{QueryResult, SearchResult, SubGraph, SubGraphEdge, SubGraphNode}; -use nodedb_types::value::Value; - -use crate::remote_parse::{ - format_vector_array, json_to_value, pg_value_to_value, quote_identifier, -}; -use crate::row_decode::parse_dropped_collection_rows; -use crate::traits::NodeDb; - -use super::parse::{parse_graph_traverse_json, parse_vector_search_json}; -use super::sql::{build_vector_search_sql, translate_params}; - -/// Remote NodeDB client. Connects to an Origin instance over pgwire and -/// translates `NodeDb` trait calls into SQL/DSL queries. -pub struct NodeDbRemote { - client: Arc>, -} - -/// Extract a useful detail string from a `tokio_postgres::Error`. -/// -/// Without this, `Display` returns the literal `"db error"` and the -/// SQLSTATE + server message are dropped — every failure surfaces as the -/// same opaque string and is impossible to diagnose without a debug -/// rebuild. Mirrors the harness's `pg_error_detail` so client and test -/// reports look identical. -fn pg_error_detail(e: &tokio_postgres::Error) -> String { - if let Some(db_err) = e.as_db_error() { - format!( - "{}: {} (SQLSTATE {})", - db_err.severity(), - db_err.message(), - db_err.code().code() - ) - } else { - format!("{e}") - } -} - -impl NodeDbRemote { - /// Connect to a NodeDB Origin instance. - /// - /// `config` is a standard PostgreSQL connection string: - /// `"host=localhost port=5432 user=app dbname=mydb"` - pub async fn connect(config: &str) -> NodeDbResult { - let (client, connection) = tokio_postgres::connect(config, NoTls) - .await - .map_err(|e| NodeDbError::sync_connection_failed(e.to_string()))?; - - // Spawn the connection handler — it runs in the background. - tokio::spawn(async move { - if let Err(e) = connection.await { - tracing::error!("pgwire connection error: {e}"); - } - }); - - Ok(Self { - client: Arc::new(Mutex::new(client)), - }) - } - - /// Execute a raw SQL string and return rows as `Vec>`. - async fn query_raw( - &self, - sql: &str, - params: &[&(dyn tokio_postgres::types::ToSql + Sync)], - ) -> NodeDbResult<(Vec, Vec>)> { - let client = self.client.lock().await; - let rows = client.query(sql, params).await.map_err(|e| { - NodeDbError::storage(format!("pgwire query failed: {}", pg_error_detail(&e))) - })?; - - if rows.is_empty() { - return Ok((Vec::new(), Vec::new())); - } - - let columns: Vec = rows[0] - .columns() - .iter() - .map(|c| c.name().to_string()) - .collect(); - - let mut result_rows = Vec::with_capacity(rows.len()); - for row in &rows { - let mut vals = Vec::with_capacity(columns.len()); - for (i, col) in row.columns().iter().enumerate() { - let val = pg_value_to_value(row, i, col.type_()); - vals.push(val); - } - result_rows.push(vals); - } - - Ok((columns, result_rows)) - } - - /// Execute a statement that doesn't return rows (INSERT/UPDATE/DELETE). - async fn execute_raw( - &self, - sql: &str, - params: &[&(dyn tokio_postgres::types::ToSql + Sync)], - ) -> NodeDbResult { - let client = self.client.lock().await; - client.execute(sql, params).await.map_err(|e| { - NodeDbError::storage(format!("pgwire execute failed: {}", pg_error_detail(&e))) - }) - } - - /// Execute a parameterless statement via the simple-query protocol - /// (single `Query` message — no `Parse`/`Bind`/`Describe` round-trip). - /// - /// Required for DDL statements that don't fit the extended-query - /// row-description shape that `Client::query` expects. - /// `simple_query` doesn't support bound parameters, so callers with - /// non-empty params must continue to use `query_raw`. - /// - /// All values come back as strings from the simple-query protocol; - /// we wrap them as `Value::String` and let downstream consumers - /// coerce as needed. - async fn simple_query_raw(&self, sql: &str) -> NodeDbResult<(Vec, Vec>)> { - use tokio_postgres::SimpleQueryMessage; - - let client = self.client.lock().await; - let messages = client.simple_query(sql).await.map_err(|e| { - NodeDbError::storage(format!( - "pgwire simple_query failed: {}", - pg_error_detail(&e) - )) - })?; - - let mut columns: Vec = Vec::new(); - let mut rows: Vec> = Vec::new(); - - for msg in messages { - match msg { - SimpleQueryMessage::RowDescription(fields) => { - columns = fields.iter().map(|f| f.name().to_string()).collect(); - } - SimpleQueryMessage::Row(row) => { - let mut vals = Vec::with_capacity(row.len()); - for i in 0..row.len() { - match row.get(i) { - Some(s) => vals.push(Value::String(s.to_string())), - None => vals.push(Value::Null), - } - } - rows.push(vals); - } - SimpleQueryMessage::CommandComplete(_) => { - // DDL / DML completion — no rows. - } - _ => {} - } - } - Ok((columns, rows)) - } -} - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl NodeDb for NodeDbRemote { - async fn vector_search( - &self, - collection: &str, - query: &[f32], - k: usize, - filter: Option<&MetadataFilter>, - ) -> NodeDbResult> { - let sql = build_vector_search_sql(collection, query, k, filter)?; - - let (columns, rows) = self.query_raw(&sql, &[]).await?; - - // The DSL path returns JSON in a single "result" column. - if columns.len() == 1 && columns[0] == "result" { - if let Some(row) = rows.first() - && let Some(Value::String(json_text)) = row.first() - { - return parse_vector_search_json(json_text); - } - return Ok(Vec::new()); - } - - // Structured result set: id, distance columns. - let mut results = Vec::with_capacity(rows.len()); - let id_idx = columns.iter().position(|c| c == "id").unwrap_or(0); - let dist_idx = columns.iter().position(|c| c == "distance").unwrap_or(1); - - for row in &rows { - let id = row - .get(id_idx) - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let distance = row.get(dist_idx).and_then(|v| v.as_f64()).unwrap_or(0.0) as f32; - - results.push(SearchResult { - id, - node_id: None, - distance, - metadata: HashMap::new(), - }); - } - - Ok(results) - } - - async fn vector_insert_field( - &self, - collection: &str, - field_name: &str, - id: &str, - embedding: &[f32], - metadata: Option, - ) -> NodeDbResult<()> { - // Field-aware path: emit `INSERT INTO (id, [, - // metadata]) VALUES ($1, ARRAY[...]<, $2>)` so the vector lands - // on the column named by the trait — not on whichever vector - // column the planner picks when the column name is omitted. - let coll = quote_identifier(collection); - let field = quote_identifier(field_name); - let vec_lit = format_vector_array(embedding); - - let sql = match metadata { - Some(_) => { - format!("INSERT INTO {coll} (id, {field}, metadata) VALUES ($1, {vec_lit}, $2)") - } - None => format!("INSERT INTO {coll} (id, {field}) VALUES ($1, {vec_lit})"), - }; - - if let Some(d) = metadata { - let meta_json = sonic_rs::to_string(&d) - .map_err(|e| NodeDbError::storage(format!("metadata serialization: {e}")))?; - self.execute_raw(&sql, &[&id, &meta_json]).await?; - } else { - self.execute_raw(&sql, &[&id]).await?; - } - Ok(()) - } - - async fn vector_search_field( - &self, - collection: &str, - field_name: &str, - query: &[f32], - k: usize, - filter: Option<&MetadataFilter>, - ) -> NodeDbResult> { - // Field-aware path: use the 2-arg form of `vector_distance` so - // the planner scopes the HNSW lookup to the named column. The - // single-arg form `vector_distance(ARRAY[...])` only works on - // collections that have exactly one vector column. - let coll = quote_identifier(collection); - let field = quote_identifier(field_name); - let vec_lit = format_vector_array(query); - let where_clause = match filter { - Some(f) => { - let rendered = super::sql::render_metadata_filter_public(f)?; - format!(" WHERE {rendered}") - } - None => String::new(), - }; - let sql = format!( - "SELECT id, vector_distance({field}, {vec_lit}) AS distance \ - FROM {coll}{where_clause} \ - ORDER BY vector_distance({field}, {vec_lit}) \ - LIMIT {k}" - ); - - let (columns, rows) = self.query_raw(&sql, &[]).await?; - let id_idx = columns.iter().position(|c| c == "id").unwrap_or(0); - let dist_idx = columns.iter().position(|c| c == "distance").unwrap_or(1); - - let mut results = Vec::with_capacity(rows.len()); - for row in &rows { - let id = row - .get(id_idx) - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let distance = row.get(dist_idx).and_then(|v| v.as_f64()).unwrap_or(0.0) as f32; - results.push(SearchResult { - id, - node_id: None, - distance, - metadata: HashMap::new(), - }); - } - Ok(results) - } - - async fn vector_insert( - &self, - collection: &str, - id: &str, - embedding: &[f32], - metadata: Option, - ) -> NodeDbResult<()> { - let collection = quote_identifier(collection); - let meta_json = match metadata { - Some(d) => sonic_rs::to_string(&d) - .map_err(|e| NodeDbError::storage(format!("metadata serialization: {e}")))?, - None => "{}".into(), - }; - - let sql = format!( - "INSERT INTO {collection} (id, embedding, metadata) VALUES ($1, {}, $2::jsonb)", - format_vector_array(embedding), - ); - self.execute_raw(&sql, &[&id, &meta_json]).await?; - Ok(()) - } - - async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()> { - let collection = quote_identifier(collection); - let sql = format!("DELETE FROM {collection} WHERE id = $1"); - self.execute_raw(&sql, &[&id]).await?; - Ok(()) - } - - async fn graph_traverse( - &self, - collection: &str, - start: &NodeId, - depth: u8, - edge_filter: Option<&EdgeFilter>, - ) -> NodeDbResult { - // Server-side DSL: `GRAPH TRAVERSE FROM '' DEPTH - // [LABEL '']`. The Origin graph overlay is tenant-scoped - // (the dispatcher routes on `identity.tenant_id`), so the - // `collection` argument is accepted for trait symmetry with - // `graph_insert_edge` and Lite parity but is not threaded into - // the wire DSL — every edge in the tenant participates in the - // traversal regardless of which collection it was inserted - // into. - let _ = collection; - let label_clause = edge_filter - .and_then(|f| f.labels.first()) - .map(|l| format!(" LABEL '{}'", l.replace('\'', "''"))) - .unwrap_or_default(); - let start_str = start.as_str().replace('\'', "''"); - let sql = format!("GRAPH TRAVERSE FROM '{start_str}' DEPTH {depth}{label_clause}"); - - let (columns, rows) = self.simple_query_raw(&sql).await?; - - if columns.len() == 1 && columns[0] == "result" { - if let Some(row) = rows.first() - && let Some(Value::String(json_text)) = row.first() - { - return parse_graph_traverse_json(json_text); - } - return Ok(SubGraph::empty()); - } - - // Structured: node_id, depth, edge_src, edge_dst, edge_label columns. - let mut nodes = Vec::new(); - let mut edges = Vec::new(); - let mut seen_nodes = std::collections::HashSet::new(); - - for row in &rows { - let node_id_str = row.first().and_then(|v| v.as_str()).unwrap_or(""); - let d = row.get(1).and_then(|v| v.as_i64()).unwrap_or(0) as u8; - - if seen_nodes.insert(node_id_str.to_string()) { - nodes.push(SubGraphNode { - id: NodeId::from_validated(node_id_str.to_owned()), - depth: d, - properties: HashMap::new(), - }); - } - - if let (Some(src), Some(dst), Some(label)) = ( - row.get(2).and_then(|v| v.as_str()), - row.get(3).and_then(|v| v.as_str()), - row.get(4).and_then(|v| v.as_str()), - ) { - edges.push(SubGraphEdge { - id: EdgeId::try_first( - NodeId::from_validated(src.to_owned()), - NodeId::from_validated(dst.to_owned()), - label, - ) - .expect("server wire label already validated"), - from: NodeId::from_validated(src.to_owned()), - to: NodeId::from_validated(dst.to_owned()), - label: label.to_string(), - properties: HashMap::new(), - }); - } - } - - Ok(SubGraph { nodes, edges }) - } - - async fn graph_insert_edge( - &self, - collection: &str, - from: &NodeId, - to: &NodeId, - edge_type: &str, - properties: Option, - ) -> NodeDbResult { - // `GRAPH INSERT EDGE IN '' FROM '' TO '' - // TYPE '