Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ command = "cargo"
args = [
"vstyle",
"curate",
"--language",
"rust",
"--workspace",
"--all-features"
]
Expand All @@ -95,6 +97,8 @@ command = "cargo"
args = [
"vstyle",
"tune",
"--language",
"rust",
"--workspace",
"--all-features",
"--strict",
Expand Down
5 changes: 3 additions & 2 deletions apps/elf-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ elf-storage = { workspace = true }
vergen-gitcl = { workspace = true }

[dev-dependencies]
sqlx = { workspace = true }
tower = { workspace = true }
qdrant-client = { workspace = true }
sqlx = { workspace = true }
tower = { workspace = true }

elf-testkit = { workspace = true }
48 changes: 27 additions & 21 deletions apps/elf-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use axum::{
DefaultBodyLimit, Extension, Path, Query, State,
rejection::{JsonRejection, QueryRejection},
},
http::{HeaderMap, Request, StatusCode},
http::{
HeaderMap, Request, StatusCode,
header::{CONTENT_LENGTH, CONTENT_TYPE},
},
middleware::{self, Next},
response::{IntoResponse, Response},
routing,
Expand Down Expand Up @@ -437,24 +440,27 @@ pub fn router(state: AppState) -> Router {
.route("/v2/notes/ingest", routing::post(notes_ingest))
.route("/v2/events/ingest", routing::post(events_ingest))
.route("/v2/searches", routing::post(searches_create))
.route("/v2/searches/:search_id", routing::get(searches_get))
.route("/v2/searches/:search_id/timeline", routing::get(searches_timeline))
.route("/v2/searches/:search_id/notes", routing::post(searches_notes))
.route("/v2/searches/{search_id}", routing::get(searches_get))
.route("/v2/searches/{search_id}/timeline", routing::get(searches_timeline))
.route("/v2/searches/{search_id}/notes", routing::post(searches_notes))
.route("/v2/graph/query", routing::post(graph_query))
.route("/v2/notes", routing::get(notes_list))
.route(
"/v2/notes/:note_id",
"/v2/notes/{note_id}",
routing::get(notes_get).patch(notes_patch).delete(notes_delete),
)
.route("/v2/notes/:note_id/publish", routing::post(notes_publish))
.route("/v2/notes/:note_id/unpublish", routing::post(notes_unpublish))
.route("/v2/spaces/:space/grants", routing::get(space_grants_list).post(space_grant_upsert))
.route("/v2/spaces/:space/grants/revoke", routing::post(space_grant_revoke))
.route("/v2/notes/{note_id}/publish", routing::post(notes_publish))
.route("/v2/notes/{note_id}/unpublish", routing::post(notes_unpublish))
.route(
"/v2/spaces/{space}/grants",
routing::get(space_grants_list).post(space_grant_upsert),
)
.route("/v2/spaces/{space}/grants/revoke", routing::post(space_grant_revoke))
.with_state(state.clone())
.layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES));
let docs_router = Router::new()
.route("/v2/docs", routing::post(docs_put))
.route("/v2/docs/:doc_id", routing::get(docs_get))
.route("/v2/docs/{doc_id}", routing::get(docs_get))
.route("/v2/docs/search/l0", routing::post(docs_search_l0))
.route("/v2/docs/excerpts", routing::post(docs_excerpts_get))
.with_state(state)
Expand All @@ -477,11 +483,11 @@ pub fn admin_router(state: AppState) -> Router {
.put(admin_ingestion_profile_default_set),
)
.route(
"/v2/admin/events/ingestion-profiles/:profile_id/versions",
"/v2/admin/events/ingestion-profiles/{profile_id}/versions",
routing::get(admin_ingestion_profile_versions_list),
)
.route(
"/v2/admin/events/ingestion-profiles/:profile_id",
"/v2/admin/events/ingestion-profiles/{profile_id}",
routing::get(admin_ingestion_profile_get),
)
.route(
Expand All @@ -491,20 +497,20 @@ pub fn admin_router(state: AppState) -> Router {
.route("/v2/admin/qdrant/rebuild", routing::post(rebuild_qdrant))
.route("/v2/admin/searches/raw", routing::post(searches_raw))
.route("/v2/admin/traces/recent", routing::get(trace_recent_list))
.route("/v2/admin/traces/:trace_id", routing::get(trace_get))
.route("/v2/admin/traces/:trace_id/bundle", routing::get(trace_bundle_get))
.route("/v2/admin/trajectories/:trace_id", routing::get(trace_trajectory_get))
.route("/v2/admin/trace-items/:item_id", routing::get(trace_item_get))
.route("/v2/admin/traces/{trace_id}", routing::get(trace_get))
.route("/v2/admin/traces/{trace_id}/bundle", routing::get(trace_bundle_get))
.route("/v2/admin/trajectories/{trace_id}", routing::get(trace_trajectory_get))
.route("/v2/admin/trace-items/{item_id}", routing::get(trace_item_get))
.route("/v2/admin/graph/predicates", routing::get(admin_graph_predicates_list))
.route(
"/v2/admin/graph/predicates/:predicate_id",
"/v2/admin/graph/predicates/{predicate_id}",
routing::patch(admin_graph_predicate_patch),
)
.route(
"/v2/admin/graph/predicates/:predicate_id/aliases",
"/v2/admin/graph/predicates/{predicate_id}/aliases",
routing::post(admin_graph_predicate_alias_add).get(admin_graph_predicate_aliases_list),
)
.route("/v2/admin/notes/:note_id/provenance", routing::get(admin_note_provenance_get))
.route("/v2/admin/notes/{note_id}/provenance", routing::get(admin_note_provenance_get))
.with_state(state)
.layer(DefaultBodyLimit::max(MAX_REQUEST_BYTES))
.layer(middleware::from_fn_with_state(auth_state, admin_auth_middleware))
Expand Down Expand Up @@ -818,7 +824,7 @@ async fn with_request_id(response: Response, request_id: Uuid) -> Response {

let is_json_response = parts
.headers
.get(axum::http::header::CONTENT_TYPE)
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|content_type| content_type.starts_with("application/json"))
.unwrap_or(false);
Expand All @@ -833,7 +839,7 @@ async fn with_request_id(response: Response, request_id: Uuid) -> Response {
};

if let Some(response_body) = inject_request_id_into_json_body(&body_bytes, &request_id) {
parts.headers.remove(axum::http::header::CONTENT_LENGTH);
parts.headers.remove(CONTENT_LENGTH);

Response::from_parts(parts, Body::from(response_body))
} else {
Expand Down
126 changes: 110 additions & 16 deletions apps/elf-api/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@

//! End-to-end HTTP integration tests for the ELF API app.

use std::env;
use std::{collections::HashMap, env};

use axum::{
Router,
body::{self, Body},
http::{Request, Response, StatusCode},
};
use qdrant_client::{
client::Payload,
qdrant::{Document, PointStruct, UpsertPointsBuilder, Vector},
};
use serde_json::Map;
use tower::util::ServiceExt as _;
use tracing::Level;
use uuid::Uuid;

use elf_api::{routes, state::AppState};
Expand All @@ -23,6 +28,7 @@ use elf_config::{
SearchExpansion, SearchExplain, SearchPrefilter, Security, SecurityAuthKey, SecurityAuthRole,
Service, Storage, TtlDays,
};
use elf_storage::qdrant::{BM25_MODEL, BM25_VECTOR_NAME, DENSE_VECTOR_NAME};
use elf_testkit::TestDatabase;

const TEST_TENANT_ID: &str = "tenant_alpha";
Expand Down Expand Up @@ -192,11 +198,11 @@ fn test_config(dsn: String, qdrant_url: String, collection: String) -> Config {

fn dummy_embedding_provider() -> EmbeddingProviderConfig {
EmbeddingProviderConfig {
provider_id: "test".to_string(),
provider_id: "local".to_string(),
api_base: "http://127.0.0.1:1".to_string(),
api_key: "test-key".to_string(),
path: "/".to_string(),
model: "test".to_string(),
model: "local-hash".to_string(),
dimensions: 4_096,
timeout_ms: 1_000,
default_headers: Map::new(),
Expand All @@ -205,11 +211,11 @@ fn dummy_embedding_provider() -> EmbeddingProviderConfig {

fn dummy_provider() -> ProviderConfig {
ProviderConfig {
provider_id: "test".to_string(),
provider_id: "local".to_string(),
api_base: "http://127.0.0.1:1".to_string(),
api_key: "test-key".to_string(),
path: "/".to_string(),
model: "test".to_string(),
model: "local-token-overlap".to_string(),
timeout_ms: 1_000,
default_headers: Map::new(),
}
Expand All @@ -228,6 +234,10 @@ fn dummy_llm_provider() -> LlmProviderConfig {
}
}

fn init_test_tracing() {
let _ = tracing_subscriber::fmt().with_max_level(Level::ERROR).with_test_writer().try_init();
}

async fn test_env() -> Option<(TestDatabase, String, String)> {
let base_dsn = match elf_testkit::env_dsn() {
Some(value) => value,
Expand Down Expand Up @@ -526,9 +536,12 @@ async fn post_with_authorization_and_json_body(

async fn create_note_for_payload_level_tests(
app: &Router,
state: &AppState,
text: &str,
source_ref: serde_json::Value,
) -> Uuid {
init_test_tracing();

let payload = serde_json::json!({
"scope": "agent_private",
"notes": [{
Expand Down Expand Up @@ -556,12 +569,18 @@ async fn create_note_for_payload_level_tests(
)
.await
.expect("Failed to call note ingest.");

assert_eq!(response.status(), StatusCode::OK);

let status = response.status();
let body = body::to_bytes(response.into_body(), usize::MAX)
.await
.expect("Failed to read note ingest response body.");

assert_eq!(
status,
StatusCode::OK,
"Unexpected note ingest status with body: {}",
String::from_utf8_lossy(&body)
);

let json: serde_json::Value =
serde_json::from_slice(&body).expect("Failed to parse note ingest response.");
let note_id = json["results"]
Expand All @@ -570,8 +589,82 @@ async fn create_note_for_payload_level_tests(
.first()
.and_then(|result| result["note_id"].as_str())
.expect("Missing note_id in note ingest response.");
let note_id = Uuid::parse_str(note_id).expect("Invalid note_id in note ingest response.");

index_note_for_payload_level_tests(state, note_id, text).await;

Uuid::parse_str(note_id).expect("Invalid note_id in note ingest response.")
note_id
}

async fn index_note_for_payload_level_tests(state: &AppState, note_id: Uuid, text: &str) {
let chunk_id = Uuid::new_v4();
let embedding_version = format!(
"{}:{}:{}",
state.service.cfg.providers.embedding.provider_id,
state.service.cfg.providers.embedding.model,
state.service.cfg.storage.qdrant.vector_dim
);

sqlx::query(
"INSERT INTO memory_note_chunks (
chunk_id,
note_id,
chunk_index,
start_offset,
end_offset,
text,
embedding_version
) VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(chunk_id)
.bind(note_id)
.bind(0_i32)
.bind(0_i32)
.bind(i32::try_from(text.len()).expect("Payload-level test text fits i32 offsets."))
.bind(text)
.bind(embedding_version.as_str())
.execute(&state.service.db.pool)
.await
.expect("Failed to seed memory note chunk.");

let mut payload = Payload::new();

payload.insert("note_id", note_id.to_string());
payload.insert("chunk_id", chunk_id.to_string());
payload.insert("chunk_index", 0_i64);
payload.insert("start_offset", 0_i64);
payload.insert("end_offset", i64::try_from(text.len()).expect("Test text fits i64 offsets."));
payload.insert("tenant_id", TEST_TENANT_ID);
payload.insert("project_id", TEST_PROJECT_ID);
payload.insert("agent_id", TEST_AGENT_A);
payload.insert("scope", "agent_private");
payload.insert("type", "fact");
payload.insert("status", "active");
payload.insert("embedding_version", embedding_version);

let mut vectors = HashMap::new();

vectors.insert(
DENSE_VECTOR_NAME.to_string(),
Vector::from(vec![0.0_f32; state.service.qdrant.vector_dim as usize]),
);
vectors.insert(
BM25_VECTOR_NAME.to_string(),
Vector::from(Document::new(text.to_string(), BM25_MODEL)),
);

let point = PointStruct::new(chunk_id.to_string(), vectors, payload);

state
.service
.qdrant
.client
.upsert_points(
UpsertPointsBuilder::new(state.service.qdrant.collection.clone(), vec![point])
.wait(true),
)
.await
.expect("Failed to seed Qdrant point.");
}

async fn insert_note_summary_field(state: &AppState, note_id: Uuid, summary: &str) {
Expand Down Expand Up @@ -638,6 +731,7 @@ async fn fetch_admin_search_raw_source_ref(
payload_level: &str,
) -> serde_json::Value {
let payload = serde_json::json!({
"mode": "quick_find",
"query": query,
"top_k": 5,
"candidate_k": 10,
Expand Down Expand Up @@ -1402,10 +1496,9 @@ async fn searches_notes_payload_level_shapes_source_ref_and_structured() {
}
});
let structured_summary = "Compact structured summary used for payload-level l1 and l2 shaping.";
let note_text = "A substantially long payload shaping note used in contract tests for search details output shaping. "
.repeat(6);
let note_text = "A payload shaping note used in contract tests for search details output shaping. It includes deliberate spacing and\nline breaks so l0 compaction can be observed.";
let note_id =
create_note_for_payload_level_tests(&app, note_text.as_str(), source_ref.clone()).await;
create_note_for_payload_level_tests(&app, &state, note_text, source_ref.clone()).await;

insert_note_summary_field(&state, note_id, structured_summary).await;

Expand Down Expand Up @@ -1496,9 +1589,9 @@ async fn searches_notes_payload_level_shapes_source_ref_and_structured() {
assert!(notes_l1["structured"].is_object());
assert!(notes_l2["structured"].is_object());
assert!(notes_l0_text.len() <= 240);
assert_ne!(notes_l0_text, note_text.as_str());
assert_ne!(notes_l0_text, note_text);
assert_eq!(notes_l1_text, structured_summary);
assert_eq!(notes_l2_text, note_text.as_str());
assert_eq!(notes_l2_text, note_text);

test_db.cleanup().await.expect("Failed to cleanup test database.");
}
Expand All @@ -1512,7 +1605,7 @@ async fn admin_searches_raw_payload_level_shapes_source_ref() {
let config = test_config(test_db.dsn().to_string(), qdrant_url, collection);
let state = AppState::new(config).await.expect("Failed to initialize app state.");
let app = routes::router(state.clone());
let admin_app = routes::admin_router(state);
let admin_app = routes::admin_router(state.clone());
let source_ref = serde_json::json!({
"schema": "note_source_ref/v1",
"locator": {
Expand All @@ -1526,7 +1619,8 @@ async fn admin_searches_raw_payload_level_shapes_source_ref() {
});
let note_text =
"Admin raw search payload shaping contract note. This long note should be indexed.";
let _note_id = create_note_for_payload_level_tests(&app, note_text, source_ref.clone()).await;
let _note_id =
create_note_for_payload_level_tests(&app, &state, note_text, source_ref.clone()).await;
let raw_l0 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l0").await;
let raw_l1 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l1").await;
let raw_l2 = fetch_admin_search_raw_source_ref(&admin_app, "payload shaping", "l2").await;
Expand Down
Loading