diff --git a/Cargo.lock b/Cargo.lock index 647e5c0f..86437698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -903,6 +903,7 @@ dependencies = [ "elf-service", "elf-storage", "elf-testkit", + "qdrant-client", "serde", "serde_json", "sqlx", diff --git a/Makefile.toml b/Makefile.toml index a1881736..637bf120 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -85,6 +85,8 @@ command = "cargo" args = [ "vstyle", "curate", + "--language", + "rust", "--workspace", "--all-features" ] @@ -95,6 +97,8 @@ command = "cargo" args = [ "vstyle", "tune", + "--language", + "rust", "--workspace", "--all-features", "--strict", diff --git a/apps/elf-api/Cargo.toml b/apps/elf-api/Cargo.toml index c4d02159..fe5685ef 100644 --- a/apps/elf-api/Cargo.toml +++ b/apps/elf-api/Cargo.toml @@ -26,7 +26,8 @@ elf-storage = { workspace = true } vergen-gitcl = { workspace = true } [dev-dependencies] -sqlx = { workspace = true } -tower = { workspace = true } +qdrant-client = { workspace = true } +sqlx = { workspace = true } +tower = { workspace = true } elf-testkit = { workspace = true } diff --git a/apps/elf-api/src/routes.rs b/apps/elf-api/src/routes.rs index 148f3356..ba37546d 100644 --- a/apps/elf-api/src/routes.rs +++ b/apps/elf-api/src/routes.rs @@ -7,7 +7,10 @@ use axum::{ DefaultBodyLimit, Extension, Path, Query, State, rejection::{JsonRejection, QueryRejection}, }, - http::{HeaderMap, Request, StatusCode}, + http::{ + HeaderMap, Request, StatusCode, + header::{CONTENT_LENGTH, CONTENT_TYPE}, + }, middleware::{self, Next}, response::{IntoResponse, Response}, routing, @@ -437,24 +440,27 @@ pub fn router(state: AppState) -> Router { .route("/v2/notes/ingest", routing::post(notes_ingest)) .route("/v2/events/ingest", routing::post(events_ingest)) .route("/v2/searches", routing::post(searches_create)) - .route("/v2/searches/:search_id", routing::get(searches_get)) - .route("/v2/searches/:search_id/timeline", routing::get(searches_timeline)) - .route("/v2/searches/:search_id/notes", routing::post(searches_notes)) + .route("/v2/searches/{search_id}", routing::get(searches_get)) + .route("/v2/searches/{search_id}/timeline", routing::get(searches_timeline)) + .route("/v2/searches/{search_id}/notes", routing::post(searches_notes)) .route("/v2/graph/query", routing::post(graph_query)) .route("/v2/notes", routing::get(notes_list)) .route( - "/v2/notes/:note_id", + "/v2/notes/{note_id}", routing::get(notes_get).patch(notes_patch).delete(notes_delete), ) - .route("/v2/notes/:note_id/publish", routing::post(notes_publish)) - .route("/v2/notes/:note_id/unpublish", routing::post(notes_unpublish)) - .route("/v2/spaces/:space/grants", routing::get(space_grants_list).post(space_grant_upsert)) - .route("/v2/spaces/:space/grants/revoke", routing::post(space_grant_revoke)) + .route("/v2/notes/{note_id}/publish", routing::post(notes_publish)) + .route("/v2/notes/{note_id}/unpublish", routing::post(notes_unpublish)) + .route( + "/v2/spaces/{space}/grants", + routing::get(space_grants_list).post(space_grant_upsert), + ) + .route("/v2/spaces/{space}/grants/revoke", routing::post(space_grant_revoke)) .with_state(state.clone()) .layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES)); let docs_router = Router::new() .route("/v2/docs", routing::post(docs_put)) - .route("/v2/docs/:doc_id", routing::get(docs_get)) + .route("/v2/docs/{doc_id}", routing::get(docs_get)) .route("/v2/docs/search/l0", routing::post(docs_search_l0)) .route("/v2/docs/excerpts", routing::post(docs_excerpts_get)) .with_state(state) @@ -477,11 +483,11 @@ pub fn admin_router(state: AppState) -> Router { .put(admin_ingestion_profile_default_set), ) .route( - "/v2/admin/events/ingestion-profiles/:profile_id/versions", + "/v2/admin/events/ingestion-profiles/{profile_id}/versions", routing::get(admin_ingestion_profile_versions_list), ) .route( - "/v2/admin/events/ingestion-profiles/:profile_id", + "/v2/admin/events/ingestion-profiles/{profile_id}", routing::get(admin_ingestion_profile_get), ) .route( @@ -491,20 +497,20 @@ pub fn admin_router(state: AppState) -> Router { .route("/v2/admin/qdrant/rebuild", routing::post(rebuild_qdrant)) .route("/v2/admin/searches/raw", routing::post(searches_raw)) .route("/v2/admin/traces/recent", routing::get(trace_recent_list)) - .route("/v2/admin/traces/:trace_id", routing::get(trace_get)) - .route("/v2/admin/traces/:trace_id/bundle", routing::get(trace_bundle_get)) - .route("/v2/admin/trajectories/:trace_id", routing::get(trace_trajectory_get)) - .route("/v2/admin/trace-items/:item_id", routing::get(trace_item_get)) + .route("/v2/admin/traces/{trace_id}", routing::get(trace_get)) + .route("/v2/admin/traces/{trace_id}/bundle", routing::get(trace_bundle_get)) + .route("/v2/admin/trajectories/{trace_id}", routing::get(trace_trajectory_get)) + .route("/v2/admin/trace-items/{item_id}", routing::get(trace_item_get)) .route("/v2/admin/graph/predicates", routing::get(admin_graph_predicates_list)) .route( - "/v2/admin/graph/predicates/:predicate_id", + "/v2/admin/graph/predicates/{predicate_id}", routing::patch(admin_graph_predicate_patch), ) .route( - "/v2/admin/graph/predicates/:predicate_id/aliases", + "/v2/admin/graph/predicates/{predicate_id}/aliases", routing::post(admin_graph_predicate_alias_add).get(admin_graph_predicate_aliases_list), ) - .route("/v2/admin/notes/:note_id/provenance", routing::get(admin_note_provenance_get)) + .route("/v2/admin/notes/{note_id}/provenance", routing::get(admin_note_provenance_get)) .with_state(state) .layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES)) .layer(middleware::from_fn_with_state(auth_state, admin_auth_middleware)) @@ -818,7 +824,7 @@ async fn with_request_id(response: Response, request_id: Uuid) -> Response { let is_json_response = parts .headers - .get(axum::http::header::CONTENT_TYPE) + .get(CONTENT_TYPE) .and_then(|value| value.to_str().ok()) .map(|content_type| content_type.starts_with("application/json")) .unwrap_or(false); @@ -833,7 +839,7 @@ async fn with_request_id(response: Response, request_id: Uuid) -> Response { }; if let Some(response_body) = inject_request_id_into_json_body(&body_bytes, &request_id) { - parts.headers.remove(axum::http::header::CONTENT_LENGTH); + parts.headers.remove(CONTENT_LENGTH); Response::from_parts(parts, Body::from(response_body)) } else { diff --git a/apps/elf-api/tests/http.rs b/apps/elf-api/tests/http.rs index 498f8b6c..fc0d307f 100644 --- a/apps/elf-api/tests/http.rs +++ b/apps/elf-api/tests/http.rs @@ -2,15 +2,20 @@ //! End-to-end HTTP integration tests for the ELF API app. -use std::env; +use std::{collections::HashMap, env}; use axum::{ Router, body::{self, Body}, http::{Request, Response, StatusCode}, }; +use qdrant_client::{ + client::Payload, + qdrant::{Document, PointStruct, UpsertPointsBuilder, Vector}, +}; use serde_json::Map; use tower::util::ServiceExt as _; +use tracing::Level; use uuid::Uuid; use elf_api::{routes, state::AppState}; @@ -23,6 +28,7 @@ use elf_config::{ SearchExpansion, SearchExplain, SearchPrefilter, Security, SecurityAuthKey, SecurityAuthRole, Service, Storage, TtlDays, }; +use elf_storage::qdrant::{BM25_MODEL, BM25_VECTOR_NAME, DENSE_VECTOR_NAME}; use elf_testkit::TestDatabase; const TEST_TENANT_ID: &str = "tenant_alpha"; @@ -192,11 +198,11 @@ fn test_config(dsn: String, qdrant_url: String, collection: String) -> Config { fn dummy_embedding_provider() -> EmbeddingProviderConfig { EmbeddingProviderConfig { - provider_id: "test".to_string(), + provider_id: "local".to_string(), api_base: "http://127.0.0.1:1".to_string(), api_key: "test-key".to_string(), path: "/".to_string(), - model: "test".to_string(), + model: "local-hash".to_string(), dimensions: 4_096, timeout_ms: 1_000, default_headers: Map::new(), @@ -205,11 +211,11 @@ fn dummy_embedding_provider() -> EmbeddingProviderConfig { fn dummy_provider() -> ProviderConfig { ProviderConfig { - provider_id: "test".to_string(), + provider_id: "local".to_string(), api_base: "http://127.0.0.1:1".to_string(), api_key: "test-key".to_string(), path: "/".to_string(), - model: "test".to_string(), + model: "local-token-overlap".to_string(), timeout_ms: 1_000, default_headers: Map::new(), } @@ -228,6 +234,10 @@ fn dummy_llm_provider() -> LlmProviderConfig { } } +fn init_test_tracing() { + let _ = tracing_subscriber::fmt().with_max_level(Level::ERROR).with_test_writer().try_init(); +} + async fn test_env() -> Option<(TestDatabase, String, String)> { let base_dsn = match elf_testkit::env_dsn() { Some(value) => value, @@ -526,9 +536,12 @@ async fn post_with_authorization_and_json_body( async fn create_note_for_payload_level_tests( app: &Router, + state: &AppState, text: &str, source_ref: serde_json::Value, ) -> Uuid { + init_test_tracing(); + let payload = serde_json::json!({ "scope": "agent_private", "notes": [{ @@ -556,12 +569,18 @@ async fn create_note_for_payload_level_tests( ) .await .expect("Failed to call note ingest."); - - assert_eq!(response.status(), StatusCode::OK); - + let status = response.status(); let body = body::to_bytes(response.into_body(), usize::MAX) .await .expect("Failed to read note ingest response body."); + + assert_eq!( + status, + StatusCode::OK, + "Unexpected note ingest status with body: {}", + String::from_utf8_lossy(&body) + ); + let json: serde_json::Value = serde_json::from_slice(&body).expect("Failed to parse note ingest response."); let note_id = json["results"] @@ -570,8 +589,82 @@ async fn create_note_for_payload_level_tests( .first() .and_then(|result| result["note_id"].as_str()) .expect("Missing note_id in note ingest response."); + let note_id = Uuid::parse_str(note_id).expect("Invalid note_id in note ingest response."); + + index_note_for_payload_level_tests(state, note_id, text).await; - Uuid::parse_str(note_id).expect("Invalid note_id in note ingest response.") + note_id +} + +async fn index_note_for_payload_level_tests(state: &AppState, note_id: Uuid, text: &str) { + let chunk_id = Uuid::new_v4(); + let embedding_version = format!( + "{}:{}:{}", + state.service.cfg.providers.embedding.provider_id, + state.service.cfg.providers.embedding.model, + state.service.cfg.storage.qdrant.vector_dim + ); + + sqlx::query( + "INSERT INTO memory_note_chunks ( + chunk_id, + note_id, + chunk_index, + start_offset, + end_offset, + text, + embedding_version + ) VALUES ($1, $2, $3, $4, $5, $6, $7)", + ) + .bind(chunk_id) + .bind(note_id) + .bind(0_i32) + .bind(0_i32) + .bind(i32::try_from(text.len()).expect("Payload-level test text fits i32 offsets.")) + .bind(text) + .bind(embedding_version.as_str()) + .execute(&state.service.db.pool) + .await + .expect("Failed to seed memory note chunk."); + + let mut payload = Payload::new(); + + payload.insert("note_id", note_id.to_string()); + payload.insert("chunk_id", chunk_id.to_string()); + payload.insert("chunk_index", 0_i64); + payload.insert("start_offset", 0_i64); + payload.insert("end_offset", i64::try_from(text.len()).expect("Test text fits i64 offsets.")); + payload.insert("tenant_id", TEST_TENANT_ID); + payload.insert("project_id", TEST_PROJECT_ID); + payload.insert("agent_id", TEST_AGENT_A); + payload.insert("scope", "agent_private"); + payload.insert("type", "fact"); + payload.insert("status", "active"); + payload.insert("embedding_version", embedding_version); + + let mut vectors = HashMap::new(); + + vectors.insert( + DENSE_VECTOR_NAME.to_string(), + Vector::from(vec![0.0_f32; state.service.qdrant.vector_dim as usize]), + ); + vectors.insert( + BM25_VECTOR_NAME.to_string(), + Vector::from(Document::new(text.to_string(), BM25_MODEL)), + ); + + let point = PointStruct::new(chunk_id.to_string(), vectors, payload); + + state + .service + .qdrant + .client + .upsert_points( + UpsertPointsBuilder::new(state.service.qdrant.collection.clone(), vec![point]) + .wait(true), + ) + .await + .expect("Failed to seed Qdrant point."); } async fn insert_note_summary_field(state: &AppState, note_id: Uuid, summary: &str) { @@ -638,6 +731,7 @@ async fn fetch_admin_search_raw_source_ref( payload_level: &str, ) -> serde_json::Value { let payload = serde_json::json!({ + "mode": "quick_find", "query": query, "top_k": 5, "candidate_k": 10, @@ -1402,10 +1496,9 @@ async fn searches_notes_payload_level_shapes_source_ref_and_structured() { } }); let structured_summary = "Compact structured summary used for payload-level l1 and l2 shaping."; - let note_text = "A substantially long payload shaping note used in contract tests for search details output shaping. " - .repeat(6); + let note_text = "A payload shaping note used in contract tests for search details output shaping. It includes deliberate spacing and\nline breaks so l0 compaction can be observed."; let note_id = - create_note_for_payload_level_tests(&app, note_text.as_str(), source_ref.clone()).await; + create_note_for_payload_level_tests(&app, &state, note_text, source_ref.clone()).await; insert_note_summary_field(&state, note_id, structured_summary).await; @@ -1496,9 +1589,9 @@ async fn searches_notes_payload_level_shapes_source_ref_and_structured() { assert!(notes_l1["structured"].is_object()); assert!(notes_l2["structured"].is_object()); assert!(notes_l0_text.len() <= 240); - assert_ne!(notes_l0_text, note_text.as_str()); + assert_ne!(notes_l0_text, note_text); assert_eq!(notes_l1_text, structured_summary); - assert_eq!(notes_l2_text, note_text.as_str()); + assert_eq!(notes_l2_text, note_text); test_db.cleanup().await.expect("Failed to cleanup test database."); } @@ -1512,7 +1605,7 @@ async fn admin_searches_raw_payload_level_shapes_source_ref() { let config = test_config(test_db.dsn().to_string(), qdrant_url, collection); let state = AppState::new(config).await.expect("Failed to initialize app state."); let app = routes::router(state.clone()); - let admin_app = routes::admin_router(state); + let admin_app = routes::admin_router(state.clone()); let source_ref = serde_json::json!({ "schema": "note_source_ref/v1", "locator": { @@ -1526,7 +1619,8 @@ async fn admin_searches_raw_payload_level_shapes_source_ref() { }); let note_text = "Admin raw search payload shaping contract note. This long note should be indexed."; - let _note_id = create_note_for_payload_level_tests(&app, note_text, source_ref.clone()).await; + let _note_id = + create_note_for_payload_level_tests(&app, &state, note_text, source_ref.clone()).await; let raw_l0 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l0").await; let raw_l1 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l1").await; let raw_l2 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l2").await; diff --git a/apps/elf-eval/src/app.rs b/apps/elf-eval/src/app.rs index 4ce754b6..94bd819d 100644 --- a/apps/elf-eval/src/app.rs +++ b/apps/elf-eval/src/app.rs @@ -18,7 +18,7 @@ use uuid::Uuid; use elf_config::Config; use elf_service::{ ElfService, RankingRequestOverride, SearchIndexItem, SearchIndexResponse, SearchRequest, - search::{self, TraceReplayItem}, + search::{self, TraceReplayContext, TraceReplayItem}, }; use elf_storage::{db::Db, qdrant::QdrantStore}; @@ -1096,7 +1096,7 @@ async fn compare_trace_id( let trace_row = fetch_trace_compare_trace_row(db, trace_id).await?; let candidate_rows = fetch_trace_compare_candidate_rows(db, trace_id).await?; let stage_rows = fetch_trace_compare_stage_rows(db, trace_id).await?; - let context = elf_service::search::TraceReplayContext { + let context = TraceReplayContext { trace_id: trace_row.trace_id, query: trace_row.query.clone(), candidate_count: u32::try_from(trace_row.candidate_count).unwrap_or(0), diff --git a/apps/elf-eval/src/bin/trace_regression_gate.rs b/apps/elf-eval/src/bin/trace_regression_gate.rs index f8599180..44dd93e4 100644 --- a/apps/elf-eval/src/bin/trace_regression_gate.rs +++ b/apps/elf-eval/src/bin/trace_regression_gate.rs @@ -14,7 +14,7 @@ use tracing_subscriber::EnvFilter; use uuid::Uuid; use elf_config::Config; -use elf_service::search; +use elf_service::search::{self, TraceReplayContext}; use elf_storage::db::Db; #[derive(Debug, Parser)] @@ -346,7 +346,7 @@ async fn eval_trace( .created_at .format(&Rfc3339) .map_err(|err| eyre::eyre!("Failed to format created_at: {err}"))?; - let context = elf_service::search::TraceReplayContext { + let context = TraceReplayContext { trace_id: trace_row.trace_id, query: trace_row.query.clone(), candidate_count: u32::try_from(trace_row.candidate_count).unwrap_or(0), diff --git a/apps/elf-mcp/src/server.rs b/apps/elf-mcp/src/server.rs index 8a4fc32b..27e75d58 100644 --- a/apps/elf-mcp/src/server.rs +++ b/apps/elf-mcp/src/server.rs @@ -36,6 +36,7 @@ const HEADER_AUTHORIZATION: &str = "Authorization"; enum HttpMethod { Get, Post, + Put, Patch, Delete, } @@ -155,6 +156,29 @@ impl ElfMcp { handle_response(response).await } + async fn forward_put( + &self, + path: &str, + body: Value, + read_profile_override: Option<&str>, + request_id: Uuid, + ) -> Result { + let url = format!("{}{}", self.api_base_for_path(path), path); + let response = self + .apply_context_headers( + self.client.put(url).json(&body), + read_profile_override, + request_id, + ) + .send() + .await + .map_err(|err| { + ErrorData::internal_error(format!("ELF API request failed: {err}"), None) + })?; + + handle_response(response).await + } + async fn forward_delete( &self, path: &str, @@ -212,6 +236,9 @@ impl ElfMcp { .await, HttpMethod::Get => self.forward_get(path, params, read_profile_override, request_id).await, + HttpMethod::Put => + self.forward_put(path, Value::Object(params), read_profile_override, request_id) + .await, HttpMethod::Patch => self.forward_patch(path, Value::Object(params), read_profile_override, request_id) .await, @@ -643,7 +670,7 @@ impl ElfMcp { &self, params: JsonObject, ) -> Result { - self.forward(HttpMethod::Post, "/v2/admin/events/ingestion-profiles/default", params, None) + self.forward(HttpMethod::Put, "/v2/admin/events/ingestion-profiles/default", params, None) .await } } @@ -1487,9 +1514,20 @@ async fn mcp_auth_middleware( #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::Duration, + }; - use axum::http::HeaderMap; + use axum::{ + Json, Router, + extract::State, + http::{HeaderMap, Method, Uri}, + routing, + }; + use serde_json::Value; + use tokio::{net::TcpListener, sync::oneshot, time}; use crate::app::{ McpAuthState, @@ -1497,6 +1535,8 @@ mod tests { }; use elf_config::McpContext; + type RequestRecorder = Arc>>>; + const ALL_TOOL_DEFINITIONS: [ToolDefinition; 28] = [ ToolDefinition::new( "elf_notes_ingest", @@ -1662,7 +1702,7 @@ mod tests { ), ToolDefinition::new( "elf_admin_events_ingestion_profile_default_set", - HttpMethod::Post, + HttpMethod::Put, "/v2/admin/events/ingestion-profiles/default", "Set the default ingestion profile for add_event.", ), @@ -1677,6 +1717,12 @@ mod tests { streaming: bool, } + struct RecordedRequest { + method: Method, + path: String, + body: Value, + } + impl ToolDefinition { const fn new( name: &'static str, @@ -2023,4 +2069,86 @@ mod tests { assert!(description.contains("source_ref")); assert!(description.contains("structured")); } + + #[tokio::test] + async fn default_ingestion_profile_set_uses_put_admin_default_path() { + let (admin_base, received) = spawn_recording_admin_server().await; + let context = McpContext { + tenant_id: "tenant-a".to_string(), + project_id: "project-a".to_string(), + agent_id: "agent-a".to_string(), + read_profile: "private_plus_project".to_string(), + }; + let mcp = ElfMcp::new( + "http://127.0.0.1:9000".to_string(), + admin_base, + ElfContextHeaders::new(&context), + McpAuthState::Off, + ); + let params = serde_json::Map::from_iter([ + ("profile_id".to_string(), Value::String("profile-a".to_string())), + ("version".to_string(), Value::Number(2.into())), + ]); + let result = mcp.elf_admin_events_ingestion_profile_default_set(params).await; + + assert!(result.is_ok(), "default setter should forward successfully: {result:?}"); + + let request = receive_recorded_request(received).await; + + assert_eq!(request.method, Method::PUT); + assert_eq!(request.path, "/v2/admin/events/ingestion-profiles/default"); + assert_eq!(request.body.get("profile_id").and_then(Value::as_str), Some("profile-a")); + assert_eq!(request.body.get("version").and_then(Value::as_i64), Some(2)); + } + + async fn spawn_recording_admin_server() -> (String, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + let app = Router::new() + .route("/v2/admin/events/ingestion-profiles/default", routing::any(record_request)) + .with_state(Arc::new(Mutex::new(Some(tx)))); + let listener = match TcpListener::bind("127.0.0.1:0").await { + Ok(listener) => listener, + Err(err) => panic!("Failed to bind MCP recording admin server: {err}."), + }; + let addr = match listener.local_addr() { + Ok(addr) => addr, + Err(err) => panic!("Failed to read MCP recording admin server address: {err}."), + }; + + tokio::spawn(async move { + if let Err(err) = axum::serve(listener, app).await { + panic!("MCP recording admin server failed: {err}."); + } + }); + + (format!("http://{addr}"), rx) + } + + async fn record_request( + State(recorder): State, + method: Method, + uri: Uri, + Json(body): Json, + ) -> Json { + let mut sender = match recorder.lock() { + Ok(sender) => sender, + Err(err) => panic!("MCP recording admin server mutex was poisoned: {err}."), + }; + + if let Some(tx) = sender.take() { + let _ = tx.send(RecordedRequest { method, path: uri.path().to_string(), body }); + } + + Json(serde_json::json!({ "ok": true })) + } + + async fn receive_recorded_request( + received: oneshot::Receiver, + ) -> RecordedRequest { + match time::timeout(Duration::from_secs(3), received).await { + Ok(Ok(request)) => request, + Ok(Err(err)) => panic!("MCP recording admin server closed before recording: {err}."), + Err(err) => panic!("Timed out waiting for MCP recording admin server: {err}."), + } + } } diff --git a/apps/elf-worker/src/lib.rs b/apps/elf-worker/src/lib.rs index 6335886f..d8b4bbf3 100644 --- a/apps/elf-worker/src/lib.rs +++ b/apps/elf-worker/src/lib.rs @@ -18,6 +18,7 @@ use elf_storage::{ db::Db, qdrant::{DOCS_SEARCH_FILTER_INDEXES, QdrantStore}, }; +use worker::WorkerState; /// CLI arguments for the worker binary. #[derive(Debug, Parser)] @@ -61,7 +62,7 @@ pub async fn run(args: Args) -> Result<()> { max_tokens: config.chunking.max_tokens, overlap_tokens: config.chunking.overlap_tokens, }; - let state = worker::WorkerState { + let state = WorkerState { db, qdrant, docs_qdrant, diff --git a/docs/guide/getting_started.md b/docs/guide/getting_started.md index b633bd1a..02614ad3 100644 --- a/docs/guide/getting_started.md +++ b/docs/guide/getting_started.md @@ -39,9 +39,9 @@ export ELF_QDRANT_COLLECTION="mem_notes_v2" export ELF_QDRANT_DOCS_COLLECTION="doc_chunks_v1" export ELF_QDRANT_VECTOR_DIM="4096" ./qdrant/init.sh +``` You can still run the script manually when bootstrapping a fresh Qdrant instance, but startup is not blocked if you rely on auto-ensure. -``` ## 3. Start services diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index 48337f5a..2baa3dc3 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -1499,7 +1499,7 @@ Response: "updated_at": "..." } -POST /v2/admin/events/ingestion-profiles/default +PUT /v2/admin/events/ingestion-profiles/default Headers: - X-ELF-Tenant-Id, X-ELF-Project-Id, X-ELF-Agent-Id @@ -1946,7 +1946,7 @@ Original query: - elf_admin_events_ingestion_profile_get -> GET /v2/admin/events/ingestion-profiles/{profile_id} - elf_admin_events_ingestion_profile_versions_list -> GET /v2/admin/events/ingestion-profiles/{profile_id}/versions - elf_admin_events_ingestion_profile_default_get -> GET /v2/admin/events/ingestion-profiles/default - - elf_admin_events_ingestion_profile_default_set -> POST /v2/admin/events/ingestion-profiles/default + - elf_admin_events_ingestion_profile_default_set -> PUT /v2/admin/events/ingestion-profiles/default - elf_admin_traces_recent_list -> GET /v2/admin/traces/recent - elf_admin_trace_get -> GET /v2/admin/traces/{trace_id} - elf_admin_trajectory_get -> GET /v2/admin/trajectories/{trace_id} diff --git a/packages/elf-chunking/src/lib.rs b/packages/elf-chunking/src/lib.rs index 42d6deac..bc0fe4a8 100644 --- a/packages/elf-chunking/src/lib.rs +++ b/packages/elf-chunking/src/lib.rs @@ -2,6 +2,8 @@ pub use tokenizers::{Error, Tokenizer}; +use std::path::Path; + use unicode_segmentation::UnicodeSegmentation; /// Token-window settings used when splitting text into chunks. @@ -26,8 +28,14 @@ pub struct Chunk { pub text: String, } -/// Loads a Hugging Face tokenizer by repository identifier. +/// Loads a tokenizer from a local JSON file path or Hugging Face repository identifier. pub fn load_tokenizer(repo: &str) -> Result { + let path = Path::new(repo); + + if path.exists() && path.is_file() { + return Tokenizer::from_file(path); + } + Tokenizer::from_pretrained(repo, None) } diff --git a/packages/elf-config/tests/config_validation.rs b/packages/elf-config/tests/config_validation.rs index ae6ec892..c2b92c42 100644 --- a/packages/elf-config/tests/config_validation.rs +++ b/packages/elf-config/tests/config_validation.rs @@ -13,7 +13,7 @@ use std::{ use toml::Value; -use elf_config::{self, Config, Context, Error}; +use elf_config::{self, Config, Context, Error, MemoryPolicyRule}; const SAMPLE_CONFIG_TEMPLATE_TOML: &str = include_str!("fixtures/sample_config.template.toml"); @@ -515,10 +515,10 @@ fn security_auth_keys_require_known_read_profile() { fn memory_policy_min_confidence_must_be_finite() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - min_confidence: Some(f32::NAN), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { min_confidence: Some(f32::NAN), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected min_confidence validation error."); @@ -535,7 +535,7 @@ fn memory_policy_min_confidence_must_be_in_range() { cfg.memory .policy .rules - .push(elf_config::MemoryPolicyRule { min_confidence: Some(1.01), ..Default::default() }); + .push(MemoryPolicyRule { min_confidence: Some(1.01), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected min_confidence range validation error."); @@ -551,10 +551,10 @@ fn memory_policy_min_confidence_must_be_in_range() { fn memory_policy_min_importance_must_be_finite() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - min_importance: Some(f32::INFINITY), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { min_importance: Some(f32::INFINITY), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected min_importance validation error."); @@ -571,7 +571,7 @@ fn memory_policy_min_importance_must_be_in_range() { cfg.memory .policy .rules - .push(elf_config::MemoryPolicyRule { min_importance: Some(-0.01), ..Default::default() }); + .push(MemoryPolicyRule { min_importance: Some(-0.01), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected min_importance range validation error."); @@ -587,10 +587,10 @@ fn memory_policy_min_importance_must_be_in_range() { fn memory_policy_note_type_must_be_known_value() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - note_type: Some("unknown".to_string()), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { note_type: Some("unknown".to_string()), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected note_type validation error."); @@ -606,10 +606,10 @@ fn memory_policy_note_type_must_be_known_value() { fn memory_policy_scope_must_be_allowed() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - scope: Some("invalid_scope".to_string()), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { scope: Some("invalid_scope".to_string()), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected scope validation error."); @@ -639,10 +639,10 @@ fn memory_policy_rule_pairs_must_be_unique() { fn memory_policy_note_type_must_not_be_whitespace_only() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - note_type: Some(" ".to_string()), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { note_type: Some(" ".to_string()), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected whitespace note_type validation error."); @@ -658,10 +658,10 @@ fn memory_policy_note_type_must_not_be_whitespace_only() { fn memory_policy_scope_must_not_be_whitespace_only() { let mut cfg = base_config(); - cfg.memory.policy.rules.push(elf_config::MemoryPolicyRule { - scope: Some(" ".to_string()), - ..Default::default() - }); + cfg.memory + .policy + .rules + .push(MemoryPolicyRule { scope: Some(" ".to_string()), ..Default::default() }); let err = elf_config::validate(&cfg).expect_err("Expected whitespace scope validation error."); diff --git a/packages/elf-service/Cargo.toml b/packages/elf-service/Cargo.toml index 4ffaf5b3..87c4744a 100644 --- a/packages/elf-service/Cargo.toml +++ b/packages/elf-service/Cargo.toml @@ -15,6 +15,7 @@ tokenizers = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } +elf-chunking = { workspace = true } elf-config = { workspace = true } elf-domain = { workspace = true } elf-providers = { workspace = true } @@ -25,6 +26,5 @@ ahash = { workspace = true } axum = { workspace = true } tokio = { workspace = true } -elf-chunking = { workspace = true } -elf-testkit = { workspace = true } -elf-worker = { workspace = true } +elf-testkit = { workspace = true } +elf-worker = { workspace = true } diff --git a/packages/elf-service/src/add_event.rs b/packages/elf-service/src/add_event.rs index de806a1d..753fd5f2 100644 --- a/packages/elf-service/src/add_event.rs +++ b/packages/elf-service/src/add_event.rs @@ -8,8 +8,10 @@ use uuid::Uuid; use crate::{ ElfService, Error, InsertVersionArgs, NoteOp, REJECT_EVIDENCE_MISMATCH, - REJECT_WRITE_POLICY_MISMATCH, ResolveUpdateArgs, Result, UpdateDecision, access, - graph_ingestion, ingest_audit, + REJECT_WRITE_POLICY_MISMATCH, ResolveUpdateArgs, Result, UpdateDecision, + access::{self, ORG_PROJECT_ID}, + graph_ingestion, + ingest_audit::{self, IngestAuditArgs}, ingestion_profiles::{self, IngestionProfileRef, IngestionProfileSelector}, structured_fields::{self, StructuredFields}, }; @@ -18,7 +20,7 @@ use elf_domain::{ english_gate, evidence, memory_policy::{self, MemoryPolicyDecision}, ttl, - writegate::{self, WritePolicy, WritePolicyAudit, WritePolicyError}, + writegate::{self, NoteInput, WritePolicy, WritePolicyAudit, WritePolicyError}, }; use elf_storage::models::MemoryNote; @@ -266,7 +268,7 @@ impl ElfService { ) -> Result { let note_data = NoteProcessingData::from_request_and_note(req, ¬e); let effective_project_id = if note_data.scope.trim() == "org_shared" { - access::ORG_PROJECT_ID + ORG_PROJECT_ID } else { req.project_id.as_str() }; @@ -571,7 +573,7 @@ impl ElfService { providers: &self.providers, tenant_id: req.tenant_id.as_str(), project_id: if note_data.scope.trim() == "org_shared" { - access::ORG_PROJECT_ID + ORG_PROJECT_ID } else { req.project_id.as_str() }, @@ -1141,7 +1143,7 @@ fn reject_extracted_note_if_writegate_rejects( scope: &str, text: &str, ) -> Option { - let gate_input = elf_domain::writegate::NoteInput { + let gate_input = NoteInput { note_type: note_type.to_string(), scope: scope.to_string(), text: text.to_string(), @@ -1221,7 +1223,7 @@ async fn record_ingest_decision( graph_present: bool, write_policy_audits: Option>, ) -> Result<()> { - let args = crate::ingest_audit::IngestAuditArgs { + let args = IngestAuditArgs { tenant_id: ctx.tenant_id, project_id: ctx.project_id, agent_id: ctx.agent_id, diff --git a/packages/elf-service/src/add_note.rs b/packages/elf-service/src/add_note.rs index 9344d926..5cb433e6 100644 --- a/packages/elf-service/src/add_note.rs +++ b/packages/elf-service/src/add_note.rs @@ -8,7 +8,10 @@ use uuid::Uuid; use crate::{ ElfService, Error, InsertVersionArgs, NoteOp, ResolveUpdateArgs, Result, UpdateDecision, - UpdateDecisionMetadata, access, graph_ingestion, ingest_audit, + UpdateDecisionMetadata, + access::{self, ORG_PROJECT_ID}, + graph_ingestion, + ingest_audit::{self, IngestAuditArgs}, structured_fields::{self, StructuredFields}, }; use elf_config::Config; @@ -16,7 +19,7 @@ use elf_domain::{ english_gate, memory_policy::{self, MemoryPolicyDecision}, ttl, - writegate::{self, WritePolicy, WritePolicyAudit, WritePolicyError}, + writegate::{self, NoteInput, WritePolicy, WritePolicyAudit, WritePolicyError}, }; use elf_storage::models::MemoryNote; @@ -107,7 +110,7 @@ impl ElfService { let embed_version = crate::embedding_version(&self.cfg); let AddNoteRequest { tenant_id, project_id, agent_id, scope, notes } = req; let effective_project_id = - if scope.trim() == "org_shared" { access::ORG_PROJECT_ID } else { project_id.as_str() }; + if scope.trim() == "org_shared" { ORG_PROJECT_ID } else { project_id.as_str() }; let mut results = Vec::with_capacity(notes.len()); for (note_idx, note) in notes.into_iter().enumerate() { @@ -150,7 +153,7 @@ impl ElfService { return Ok(result); } - let (decision, metadata) = self.resolve_update_decision(ctx, ¬e).await?; + let (decision, metadata) = self.resolve_update_decision(&mut tx, ctx, ¬e).await?; let base_decision = Self::base_decision_for_update(&decision, structured_present, graph_present); let (policy_decision, decision_policy_rule, min_confidence, min_importance) = @@ -268,11 +271,12 @@ impl ElfService { async fn resolve_update_decision( &self, + tx: &mut Transaction<'_, Postgres>, ctx: &AddNoteContext<'_>, note: &AddNoteInput, ) -> Result<(UpdateDecision, UpdateDecisionMetadata)> { let decision = crate::resolve_update( - &self.db.pool, + &mut **tx, ResolveUpdateArgs { cfg: &self.cfg, providers: &self.providers, @@ -437,7 +441,7 @@ impl ElfService { min_importance: Option, write_policy_audit: Option, ) -> Result<()> { - let decision = crate::ingest_audit::IngestAuditArgs { + let decision = IngestAuditArgs { tenant_id: ctx.tenant_id, project_id: ctx.project_id, agent_id: ctx.agent_id, @@ -894,7 +898,7 @@ fn reject_note_if_writegate_rejects( scope: &str, note: &AddNoteInput, ) -> Option { - let gate_input = elf_domain::writegate::NoteInput { + let gate_input = NoteInput { note_type: note.r#type.clone(), scope: scope.to_string(), text: note.text.clone(), diff --git a/packages/elf-service/src/delete.rs b/packages/elf-service/src/delete.rs index 0570d724..34b2fc7f 100644 --- a/packages/elf-service/src/delete.rs +++ b/packages/elf-service/src/delete.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; -use crate::{ElfService, Error, InsertVersionArgs, NoteOp, Result, access}; +use crate::{ElfService, Error, InsertVersionArgs, NoteOp, Result, access::ORG_PROJECT_ID}; use elf_storage::models::MemoryNote; /// Request payload for note deletion. @@ -54,7 +54,7 @@ FOR UPDATE", .bind(req.note_id) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&mut *tx) .await? .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; diff --git a/packages/elf-service/src/docs.rs b/packages/elf-service/src/docs.rs index 55196442..ec9b652b 100644 --- a/packages/elf-service/src/docs.rs +++ b/packages/elf-service/src/docs.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use crate::{ ElfService, Error, Result, - access::{self, SharedSpaceGrantKey}, + access::{self, ORG_PROJECT_ID, SharedSpaceGrantKey}, search, }; use elf_config::Config; @@ -558,11 +558,8 @@ impl ElfService { let DocsPutRequest { tenant_id, project_id, agent_id, scope, title, source_ref, .. } = req; let chunking_profile = resolve_doc_chunking_profile(doc_type); let tokenizer = load_tokenizer(&self.cfg)?; - let effective_project_id = if scope.trim() == "org_shared" { - crate::access::ORG_PROJECT_ID - } else { - project_id.as_str() - }; + let effective_project_id = + if scope.trim() == "org_shared" { ORG_PROJECT_ID } else { project_id.as_str() }; let content_bytes = content.len(); let content_hash = blake3::hash(content.as_bytes()); let doc_id = Uuid::new_v4(); @@ -688,7 +685,7 @@ LIMIT 1", .bind(req.doc_id) .bind(tenant_id) .bind(project_id) - .bind(crate::access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&self.db.pool) .await?; let Some(row) = row else { @@ -1699,7 +1696,7 @@ fn load_tokenizer(cfg: &Config) -> Result { }); } - Tokenizer::from_pretrained(tokenizer_repo, None).map_err(|err| Error::InvalidRequest { + elf_chunking::load_tokenizer(tokenizer_repo).map_err(|err| Error::InvalidRequest { message: format!("failed to load tokenizer: {err}"), }) } @@ -1807,7 +1804,7 @@ fn build_doc_search_filter( if allowed_scopes.iter().any(|scope| scope == "org_shared") { let org_filter = Filter::all([ - Condition::matches("project_id", crate::access::ORG_PROJECT_ID.to_string()), + Condition::matches("project_id", ORG_PROJECT_ID.to_string()), Condition::matches("scope", "org_shared".to_string()), ]); @@ -2164,7 +2161,7 @@ LIMIT 1", .bind(doc_id) .bind(tenant_id) .bind(project_id) - .bind(crate::access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(executor) .await?; @@ -2303,7 +2300,7 @@ WHERE c.chunk_id = ANY($1) .bind(tenant_id) .bind(project_id) .bind(status) - .bind(crate::access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_all(executor) .await?; let mut map = HashMap::with_capacity(rows.len()); diff --git a/packages/elf-service/src/graph_query.rs b/packages/elf-service/src/graph_query.rs index eca25bd6..f949aa83 100644 --- a/packages/elf-service/src/graph_query.rs +++ b/packages/elf-service/src/graph_query.rs @@ -7,7 +7,11 @@ use sqlx::{FromRow, PgConnection}; use time::OffsetDateTime; use uuid::Uuid; -use crate::{ElfService, Error, Result, access, search}; +use crate::{ + ElfService, Error, Result, + access::{self, ORG_PROJECT_ID}, + search, +}; use elf_storage::{graph, models::GraphEntity}; /// Schema identifier for graph-query responses. @@ -676,7 +680,7 @@ async fn fetch_graph_query_rows( .bind(shared_scope_keys) .bind(limit_plus_one) .bind(GRAPH_QUERY_EVIDENCE_LIMIT) - .bind(crate::access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .bind(predicate_id) .fetch_all(conn) .await?; diff --git a/packages/elf-service/src/list.rs b/packages/elf-service/src/list.rs index 5f21e7ab..d1e94803 100644 --- a/packages/elf-service/src/list.rs +++ b/packages/elf-service/src/list.rs @@ -8,7 +8,10 @@ use sqlx::{PgPool, QueryBuilder}; use time::OffsetDateTime; use uuid::Uuid; -use crate::{ElfService, Error, Result, access}; +use crate::{ + ElfService, Error, Result, + access::{self, ORG_PROJECT_ID}, +}; use elf_storage::models::MemoryNote; /// Request payload for note listing. @@ -233,7 +236,7 @@ async fn list_notes( builder.push(" AND (project_id = "); builder.push_bind(project_id); builder.push(" OR (project_id = "); - builder.push_bind(access::ORG_PROJECT_ID); + builder.push_bind(ORG_PROJECT_ID); builder.push(" AND scope = "); builder.push_bind("org_shared"); builder.push("))"); diff --git a/packages/elf-service/src/notes.rs b/packages/elf-service/src/notes.rs index 4bad76ab..5b4a2f5d 100644 --- a/packages/elf-service/src/notes.rs +++ b/packages/elf-service/src/notes.rs @@ -8,7 +8,8 @@ use time::OffsetDateTime; use uuid::Uuid; use crate::{ - ElfService, Error, Result, access, + ElfService, Error, Result, + access::{self, ORG_PROJECT_ID}, structured_fields::{self, StructuredFields}, }; use elf_storage::models::MemoryNote; @@ -93,7 +94,7 @@ WHERE note_id = $1 .bind(req.note_id) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&self.db.pool) .await?; let Some(note) = row else { diff --git a/packages/elf-service/src/progressive_search.rs b/packages/elf-service/src/progressive_search.rs index 951a3aa9..32a8b50d 100644 --- a/packages/elf-service/src/progressive_search.rs +++ b/packages/elf-service/src/progressive_search.rs @@ -15,7 +15,7 @@ use uuid::Uuid; use crate::{ ElfService, NoteFetchResponse, PayloadLevel, QueryPlan, SearchRequest, SearchTrajectorySummary, - access::{self, SharedSpaceGrantKey}, + access::{self, ORG_PROJECT_ID, SharedSpaceGrantKey}, structured_fields::{self, StructuredFields}, }; use elf_config::Config; @@ -632,7 +632,7 @@ WHERE note_id = ANY($1::uuid[]) .bind(requested_in_session.as_slice()) .bind(session.tenant_id.as_str()) .bind(session.project_id.as_str()) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_all(&self.db.pool) .await?; @@ -880,17 +880,26 @@ fn truncate_chars(raw: &str, max_chars: usize) -> String { return raw.to_string(); } - let mut out = String::with_capacity(max_chars + 3); + const TRUNCATION_MARKER: &str = "..."; + + let marker_chars = TRUNCATION_MARKER.chars().count(); + + if max_chars <= marker_chars { + return TRUNCATION_MARKER.chars().take(max_chars).collect(); + } + + let truncated_chars = max_chars - marker_chars; + let mut out = String::with_capacity(max_chars); for (idx, ch) in raw.chars().enumerate() { - if idx >= max_chars { + if idx >= truncated_chars { break; } out.push(ch); } - out.push_str("..."); + out.push_str(TRUNCATION_MARKER); out } diff --git a/packages/elf-service/src/search.rs b/packages/elf-service/src/search.rs index 10d17392..4fbbc268 100644 --- a/packages/elf-service/src/search.rs +++ b/packages/elf-service/src/search.rs @@ -21,7 +21,11 @@ use sqlx::{FromRow, PgConnection, PgExecutor, PgPool, QueryBuilder, Row}; use time::{Duration, OffsetDateTime}; use uuid::Uuid; -use crate::{ElfService, Result, access, ranking_explain_v2}; +use crate::{ + ElfService, Result, + access::{self, ORG_PROJECT_ID}, + ranking_explain_v2::{self, SEARCH_RANKING_EXPLAIN_SCHEMA_V2, TraceTermsArgs}, +}; use elf_config::{Config, SearchCache}; use elf_domain::english_gate; use elf_storage::{ @@ -3432,7 +3436,7 @@ LIMIT $7", .bind(args.non_private_scopes) .bind(args.vec_text) .bind(args.retrieval_limit) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_all(&self.db.pool) .await?; @@ -3476,7 +3480,7 @@ LIMIT $8", .bind(args.non_private_scopes) .bind(args.vec_text) .bind(args.retrieval_limit) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_all(&self.db.pool) .await?; @@ -4046,7 +4050,7 @@ ORDER BY c.note_id ASC, e.vec <=> $3::text::vector ASC", let mut scored = Vec::with_capacity(snippet_items.len()); for ((item, rerank_score), rerank_rank) in - snippet_items.into_iter().zip(scores.into_iter()).zip(rerank_ranks.into_iter()) + snippet_items.into_iter().zip(scores).zip(rerank_ranks) { scored.push(score_chunk_candidate(&score_ctx, item, rerank_score, rerank_rank)); } @@ -4600,7 +4604,7 @@ WHERE note_id = ANY($1::uuid[]) .bind(candidate_note_ids) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_all(&self.db.pool) .await?; let mut note_meta = HashMap::new(); @@ -4954,7 +4958,7 @@ fn build_search_filter( if allowed_scopes.iter().any(|scope| scope == "org_shared") { let org_filter = Filter::all([ - Condition::matches("project_id", access::ORG_PROJECT_ID.to_string()), + Condition::matches("project_id", ORG_PROJECT_ID.to_string()), Condition::matches("scope", "org_shared".to_string()), ]); @@ -5147,34 +5151,31 @@ fn build_search_item_and_trace_item( matched_fields, args.structured_matches.get(&args.scored_chunk.item.note.note_id), ); - let trace_terms = - ranking_explain_v2::build_trace_terms_v2(ranking_explain_v2::TraceTermsArgs { - cfg: args.cfg, - blend_enabled: args.blend_policy.enabled, - retrieval_normalization: args.blend_policy.retrieval_normalization.as_str(), - rerank_normalization: args.blend_policy.rerank_normalization.as_str(), - blend_retrieval_weight: args.scored_chunk.blend_retrieval_weight, - retrieval_rank: args.scored_chunk.item.retrieval_rank, - retrieval_norm: args.scored_chunk.retrieval_norm, - retrieval_term: args.scored_chunk.retrieval_term, - rerank_score: args.scored_chunk.rerank_score, - rerank_rank: args.scored_chunk.rerank_rank, - rerank_norm: args.scored_chunk.rerank_norm, - rerank_term: args.scored_chunk.rerank_term, - tie_breaker_score: args.scored_chunk.tie_breaker_score, - importance: args.scored_chunk.importance, - age_days: args.scored_chunk.age_days, - scope: args.scored_chunk.item.note.scope.as_str(), - scope_context_boost: args.scored_chunk.scope_context_boost, - deterministic_lexical_overlap_ratio: args - .scored_chunk - .deterministic_lexical_overlap_ratio, - deterministic_lexical_bonus: args.scored_chunk.deterministic_lexical_bonus, - deterministic_hit_count: args.scored_chunk.deterministic_hit_count, - deterministic_last_hit_age_days: args.scored_chunk.deterministic_last_hit_age_days, - deterministic_hit_boost: args.scored_chunk.deterministic_hit_boost, - deterministic_decay_penalty: args.scored_chunk.deterministic_decay_penalty, - }); + let trace_terms = ranking_explain_v2::build_trace_terms_v2(TraceTermsArgs { + cfg: args.cfg, + blend_enabled: args.blend_policy.enabled, + retrieval_normalization: args.blend_policy.retrieval_normalization.as_str(), + rerank_normalization: args.blend_policy.rerank_normalization.as_str(), + blend_retrieval_weight: args.scored_chunk.blend_retrieval_weight, + retrieval_rank: args.scored_chunk.item.retrieval_rank, + retrieval_norm: args.scored_chunk.retrieval_norm, + retrieval_term: args.scored_chunk.retrieval_term, + rerank_score: args.scored_chunk.rerank_score, + rerank_rank: args.scored_chunk.rerank_rank, + rerank_norm: args.scored_chunk.rerank_norm, + rerank_term: args.scored_chunk.rerank_term, + tie_breaker_score: args.scored_chunk.tie_breaker_score, + importance: args.scored_chunk.importance, + age_days: args.scored_chunk.age_days, + scope: args.scored_chunk.item.note.scope.as_str(), + scope_context_boost: args.scored_chunk.scope_context_boost, + deterministic_lexical_overlap_ratio: args.scored_chunk.deterministic_lexical_overlap_ratio, + deterministic_lexical_bonus: args.scored_chunk.deterministic_lexical_bonus, + deterministic_hit_count: args.scored_chunk.deterministic_hit_count, + deterministic_last_hit_age_days: args.scored_chunk.deterministic_last_hit_age_days, + deterministic_hit_boost: args.scored_chunk.deterministic_hit_boost, + deterministic_decay_penalty: args.scored_chunk.deterministic_decay_penalty, + }); let response_terms = ranking_explain_v2::strip_term_inputs(&trace_terms); let relation_context = args.relation_contexts.get(&args.scored_chunk.item.note.note_id).cloned(); @@ -5191,7 +5192,7 @@ fn build_search_item_and_trace_item( matched_fields: matched_fields.clone(), }, ranking: SearchRankingExplain { - schema: ranking_explain_v2::SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), + schema: SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), policy_id: args.policy_id.to_string(), final_score: args.scored_chunk.final_score, terms: response_terms, @@ -5202,7 +5203,7 @@ fn build_search_item_and_trace_item( let trace_explain = SearchExplain { r#match: SearchMatchExplain { matched_terms, matched_fields }, ranking: SearchRankingExplain { - schema: ranking_explain_v2::SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), + schema: SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), policy_id: args.policy_id.to_string(), final_score: args.scored_chunk.final_score, terms: trace_terms, @@ -5704,7 +5705,7 @@ fn build_replay_items( let mut out = Vec::with_capacity(results.len()); for scored in results { - let terms = ranking_explain_v2::build_trace_terms_v2(ranking_explain_v2::TraceTermsArgs { + let terms = ranking_explain_v2::build_trace_terms_v2(TraceTermsArgs { cfg, blend_enabled: blend_policy.enabled, retrieval_normalization: blend_policy.retrieval_normalization.as_str(), @@ -5732,7 +5733,7 @@ fn build_replay_items( let explain = SearchExplain { r#match: SearchMatchExplain { matched_terms: Vec::new(), matched_fields: Vec::new() }, ranking: SearchRankingExplain { - schema: ranking_explain_v2::SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), + schema: SEARCH_RANKING_EXPLAIN_SCHEMA_V2.to_string(), policy_id: policy_id.to_string(), final_score: scored.final_score, terms, diff --git a/packages/elf-service/src/sharing.rs b/packages/elf-service/src/sharing.rs index 95311e5d..7687f723 100644 --- a/packages/elf-service/src/sharing.rs +++ b/packages/elf-service/src/sharing.rs @@ -6,7 +6,10 @@ use serde::{Deserialize, Serialize}; use sqlx::FromRow; use uuid::Uuid; -use crate::{ElfService, Error, InsertVersionArgs, access}; +use crate::{ + ElfService, Error, InsertVersionArgs, + access::{self, ORG_PROJECT_ID}, +}; use elf_storage::models::MemoryNote; const PROJECT_SPACE_GRANT_UPSERT_SQL: &str = "\ @@ -270,7 +273,7 @@ FOR UPDATE", .bind(req.note_id) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&mut *tx) .await? .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; @@ -296,8 +299,7 @@ FOR UPDATE", return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); } - let target_project_id = - if scope == "org_shared" { access::ORG_PROJECT_ID } else { project_id }; + let target_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; access::ensure_active_project_scope_grant( &mut *tx, @@ -377,7 +379,7 @@ FOR UPDATE", .bind(req.note_id) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&mut *tx) .await? .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() })?; @@ -401,7 +403,7 @@ FOR UPDATE", let now = time::OffsetDateTime::now_utc(); let prev_snapshot = crate::note_snapshot(¬e); - if note.scope == "org_shared" && note.project_id == access::ORG_PROJECT_ID { + if note.scope == "org_shared" && note.project_id == ORG_PROJECT_ID { note.project_id = project_id.to_string(); } @@ -486,8 +488,7 @@ FOR UPDATE", let grantee_agent_id_ref = grantee_agent_id.as_deref(); let now = time::OffsetDateTime::now_utc(); - let effective_project_id = - if scope == "org_shared" { access::ORG_PROJECT_ID } else { project_id }; + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; if req.grantee_kind == GranteeKind::Project { self.upsert_project_grant(tenant_id, effective_project_id, scope, agent_id, now) @@ -604,8 +605,7 @@ FOR UPDATE", return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); } - let effective_project_id = - if scope == "org_shared" { access::ORG_PROJECT_ID } else { project_id }; + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; let revocation = sqlx::query( "\ UPDATE memory_space_grants @@ -667,8 +667,7 @@ WHERE tenant_id = $1 return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); } - let effective_project_id = - if scope == "org_shared" { access::ORG_PROJECT_ID } else { project_id }; + let effective_project_id = if scope == "org_shared" { ORG_PROJECT_ID } else { project_id }; #[derive(FromRow)] struct Row { diff --git a/packages/elf-service/src/update.rs b/packages/elf-service/src/update.rs index bc938391..b508a522 100644 --- a/packages/elf-service/src/update.rs +++ b/packages/elf-service/src/update.rs @@ -6,8 +6,11 @@ use sqlx::{Postgres, Transaction}; use time::OffsetDateTime; use uuid::Uuid; -use crate::{ElfService, Error, InsertVersionArgs, NoteOp, Result, access}; -use elf_domain::{english_gate, ttl, writegate}; +use crate::{ElfService, Error, InsertVersionArgs, NoteOp, Result, access::ORG_PROJECT_ID}; +use elf_domain::{ + english_gate, ttl, + writegate::{self, NoteInput}, +}; use elf_storage::models::MemoryNote; /// Request payload for note updates. @@ -79,7 +82,7 @@ impl ElfService { } else { note.text.clone() }; - let gate = elf_domain::writegate::NoteInput { + let gate = NoteInput { note_type: note.r#type.clone(), scope: note.scope.clone(), text: candidate_text, @@ -166,7 +169,7 @@ FOR UPDATE", .bind(note_id) .bind(tenant_id) .bind(project_id) - .bind(access::ORG_PROJECT_ID) + .bind(ORG_PROJECT_ID) .fetch_optional(&mut **tx) .await? .ok_or_else(|| Error::InvalidRequest { message: "Note not found.".to_string() }) diff --git a/packages/elf-service/tests/acceptance/chunk_search.rs b/packages/elf-service/tests/acceptance/chunk_search.rs index 9223c32c..fddc5124 100644 --- a/packages/elf-service/tests/acceptance/chunk_search.rs +++ b/packages/elf-service/tests/acceptance/chunk_search.rs @@ -1044,7 +1044,12 @@ async fn search_details_payload_level_shapes_text_and_fields() { }; let note_id = Uuid::new_v4(); let chunk_id = Uuid::new_v4(); - let note_text = "This is the long note body used for detail shaping. It contains enough tokens to show truncation and should be reduced for compact payload levels."; + let note_text = concat!( + "This is the long note body used for detail shaping. It contains enough tokens to show ", + "truncation and should be reduced for compact payload levels. The extra detail keeps ", + "running with repeated operational context about source references, structured fields, ", + "session hydration, ranking metadata, and payload contracts so l0 cannot equal the raw note.", + ); let source_ref = serde_json::json!({ "schema": "note_source_ref/v1", "locator": { diff --git a/packages/elf-service/tests/acceptance/docs_extension_v1.rs b/packages/elf-service/tests/acceptance/docs_extension_v1.rs index 66b417dc..f110596a 100644 --- a/packages/elf-service/tests/acceptance/docs_extension_v1.rs +++ b/packages/elf-service/tests/acceptance/docs_extension_v1.rs @@ -17,7 +17,7 @@ use tokio::{ }; use uuid::Uuid; -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; use elf_config::EmbeddingProviderConfig; use elf_service::{ AddNoteInput, AddNoteRequest, BoxFuture, DocsExcerptsGetRequest, DocsGetRequest, @@ -27,7 +27,7 @@ use elf_service::{ }; use elf_storage::{db::Db, qdrant::QdrantStore}; use elf_testkit::TestDatabase; -use elf_worker::worker; +use elf_worker::worker::{self, WorkerState}; const TEST_CONTENT: &str = "ELF docs extension v1 stores evidence. Keyword: peregrine.\nSecond sentence for chunking."; @@ -1876,7 +1876,7 @@ async fn assert_doc_excerpt(service: &ElfService, doc_id: Uuid, content_hash: &s async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) { let (api_base, shutdown) = start_embed_server().await; - let worker_state = worker::WorkerState { + let worker_state = WorkerState { db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), qdrant: QdrantStore::new(&service.cfg.storage.qdrant) .expect("Failed to build Qdrant store."), @@ -1895,7 +1895,7 @@ async fn spawn_doc_worker(service: &ElfService) -> (JoinHandle<()>, Sender<()>) timeout_ms: 1_000, default_headers: Map::new(), }, - chunking: crate::acceptance::chunking::ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, + chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, tokenizer: build_test_tokenizer(), }; let handle = tokio::spawn(async move { diff --git a/packages/elf-service/tests/acceptance/graph_ingestion.rs b/packages/elf-service/tests/acceptance/graph_ingestion.rs index 0e4596e2..639c9096 100644 --- a/packages/elf-service/tests/acceptance/graph_ingestion.rs +++ b/packages/elf-service/tests/acceptance/graph_ingestion.rs @@ -8,7 +8,7 @@ use sqlx::{FromRow, PgPool}; use time::OffsetDateTime; use uuid::Uuid; -use crate::acceptance; +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; use elf_config::EmbeddingProviderConfig; use elf_domain::memory_policy::MemoryPolicyDecision; use elf_service::{ @@ -384,8 +384,8 @@ async fn add_note_duplicate_fact_attaches_multiple_evidence() { }; let providers = Providers::new( Arc::new(HashEmbedding { vector_dim: 4_096 }), - Arc::new(crate::acceptance::StubRerank), - Arc::new(crate::acceptance::SpyExtractor { + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: serde_json::json!({ "notes": [] }), }), @@ -457,9 +457,9 @@ async fn add_note_single_predicate_supersedes_conflicting_fact() { return; }; let providers = Providers::new( - Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), - Arc::new(crate::acceptance::StubRerank), - Arc::new(crate::acceptance::SpyExtractor { + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: serde_json::json!({ "notes": [] }), }), @@ -541,9 +541,9 @@ async fn add_note_invalid_relation_rejected_has_field_path() { return; }; let providers = Providers::new( - Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), - Arc::new(crate::acceptance::StubRerank), - Arc::new(crate::acceptance::SpyExtractor { + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: serde_json::json!({ "notes": [] }), }), @@ -615,9 +615,9 @@ async fn add_note_persists_graph_relations() { return; }; let providers = Providers::new( - Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), - Arc::new(crate::acceptance::StubRerank), - Arc::new(crate::acceptance::SpyExtractor { + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: serde_json::json!({ "notes": [] }), }), @@ -719,12 +719,9 @@ async fn add_event_persists_graph_relations() { }] }); let providers = Providers::new( - Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), - Arc::new(crate::acceptance::StubRerank), - Arc::new(crate::acceptance::SpyExtractor { - calls: Arc::new(AtomicUsize::new(0)), - payload: extractor_payload, - }), + Arc::new(StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubRerank), + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: extractor_payload }), ); let collection = test_db.collection_name("elf_acceptance"); let docs_collection = test_db.collection_name("elf_acceptance_docs"); diff --git a/packages/elf-service/tests/acceptance/outbox_eventual_consistency.rs b/packages/elf-service/tests/acceptance/outbox_eventual_consistency.rs index 50fe9e50..f054ad1d 100644 --- a/packages/elf-service/tests/acceptance/outbox_eventual_consistency.rs +++ b/packages/elf-service/tests/acceptance/outbox_eventual_consistency.rs @@ -20,11 +20,11 @@ use tokio::{ }; use uuid::Uuid; -use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank}; +use crate::acceptance::{self, SpyExtractor, StubEmbedding, StubRerank, chunking::ChunkingConfig}; use elf_config::EmbeddingProviderConfig; use elf_service::{AddNoteInput, AddNoteRequest, ElfService, Providers}; use elf_storage::{db::Db, qdrant::QdrantStore}; -use elf_worker::worker; +use elf_worker::worker::{self, WorkerState}; #[derive(FromRow)] struct OutboxRow { @@ -131,7 +131,7 @@ async fn embed_handler( } async fn spawn_outbox_worker(service: &ElfService, api_base: String) -> JoinHandle<()> { - let worker_state = worker::WorkerState { + let worker_state = WorkerState { db: Db::connect(&service.cfg.storage.postgres).await.expect("Failed to connect worker DB."), qdrant: QdrantStore::new(&service.cfg.storage.qdrant) .expect("Failed to build Qdrant store."), @@ -150,7 +150,7 @@ async fn spawn_outbox_worker(service: &ElfService, api_base: String) -> JoinHand timeout_ms: 1_000, default_headers: Map::new(), }, - chunking: crate::acceptance::chunking::ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, + chunking: ChunkingConfig { max_tokens: 64, overlap_tokens: 8 }, tokenizer: build_test_tokenizer(), }; diff --git a/packages/elf-service/tests/acceptance/structured_field_retrieval.rs b/packages/elf-service/tests/acceptance/structured_field_retrieval.rs index 0fd069c5..d3103c43 100644 --- a/packages/elf-service/tests/acceptance/structured_field_retrieval.rs +++ b/packages/elf-service/tests/acceptance/structured_field_retrieval.rs @@ -12,7 +12,7 @@ use sqlx::PgExecutor; use time::OffsetDateTime; use uuid::Uuid; -use crate::acceptance; +use crate::acceptance::{self, SpyExtractor, StubEmbedding}; use elf_config::ProviderConfig; use elf_service::{BoxFuture, ElfService, Providers, RerankProvider, Result, SearchRequest}; use elf_storage::qdrant::{BM25_MODEL, BM25_VECTOR_NAME, DENSE_VECTOR_NAME}; @@ -117,9 +117,9 @@ async fn setup_context(test_name: &str) -> Option { return None; }; let providers = Providers::new( - Arc::new(crate::acceptance::StubEmbedding { vector_dim: 4_096 }), + Arc::new(StubEmbedding { vector_dim: 4_096 }), Arc::new(KeywordRerank { keyword: "ZEBRA" }), - Arc::new(crate::acceptance::SpyExtractor { + Arc::new(SpyExtractor { calls: Arc::new(AtomicUsize::new(0)), payload: serde_json::json!({ "notes": [] }), }), diff --git a/packages/elf-service/tests/acceptance/suite.rs b/packages/elf-service/tests/acceptance/suite.rs index 16471911..97c28bdc 100644 --- a/packages/elf-service/tests/acceptance/suite.rs +++ b/packages/elf-service/tests/acceptance/suite.rs @@ -13,7 +13,7 @@ mod structured_field_retrieval; mod trace_admin_observability; use std::{ - env, + env, fs, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -21,6 +21,7 @@ use std::{ time::Duration, }; +use ahash::AHashMap; use qdrant_client::{ QdrantError, qdrant::{ @@ -30,6 +31,7 @@ use qdrant_client::{ }; use serde_json::{Map, Value}; use sqlx::PgExecutor; +use tokenizers::{Tokenizer, models::wordlevel::WordLevel}; use tokio::time; use elf_config::{ @@ -240,7 +242,7 @@ pub fn test_config( enabled: true, max_tokens: 512, overlap_tokens: 128, - tokenizer_repo: "gpt2".to_string(), + tokenizer_repo: test_tokenizer_repo(&collection), }, security: Security { bind_localhost_only: true, @@ -302,6 +304,32 @@ pub async fn test_db() -> Option { Some(db) } +fn test_tokenizer_repo(collection: &str) -> String { + let tokenizer_path = env::temp_dir().join(format!("{collection}-tokenizer.json")); + + if tokenizer_path.exists() { + return tokenizer_path.to_string_lossy().into_owned(); + } + + let mut vocab = AHashMap::new(); + + vocab.insert("".to_string(), 0_u32); + + let model = WordLevel::builder() + .vocab(vocab) + .unk_token("".to_string()) + .build() + .expect("Failed to build acceptance tokenizer."); + let tokenizer = Tokenizer::new(model); + let parent = tokenizer_path.parent().expect("Temporary tokenizer path has a parent directory."); + + fs::create_dir_all(parent).expect("Failed to create acceptance tokenizer directory."); + + tokenizer.save(&tokenizer_path, false).expect("Failed to save acceptance tokenizer."); + + tokenizer_path.to_string_lossy().into_owned() +} + fn test_ranking() -> Ranking { Ranking { recency_tau_days: 60.0,