diff --git a/apps/elf-api/src/routes.rs b/apps/elf-api/src/routes.rs index 4de227b7..d255d6cf 100644 --- a/apps/elf-api/src/routes.rs +++ b/apps/elf-api/src/routes.rs @@ -47,21 +47,23 @@ use elf_service::{ ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest, ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse, - ConsolidationRunsListRequest, ConsolidationRunsListResponse, DeleteRequest, DeleteResponse, - DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse, - DocsPutRequest, DocsPutResponse, DocsSearchL0Request, DocsSearchL0Response, Error, - EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest, - GraphQueryResponse, IngestionProfileSelector, KnowledgePageGetRequest, - KnowledgePageLintRequest, KnowledgePageLintResponse, KnowledgePageRebuildRequest, - KnowledgePageRebuildResponse, KnowledgePageResponse, KnowledgePageSearchRequest, - KnowledgePageSearchResponse, KnowledgePagesListRequest, KnowledgePagesListResponse, - ListRequest, ListResponse, MemoryHistoryGetRequest, MemoryHistoryResponse, NoteFetchRequest, - NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel, - PublishNoteRequest, QueryPlan, RankingRequestOverride, RebuildReport, SearchDetailsRequest, - SearchDetailsResult, SearchExplainRequest, SearchExplainResponse, SearchIndexItem, - SearchRequest, SearchResponse, SearchSessionGetRequest, SearchTimelineGroup, - SearchTimelineRequest, SearchTrajectoryResponse, SearchTrajectorySummary, ShareScope, - SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, SpaceGrantUpsertRequest, + ConsolidationRunsListRequest, ConsolidationRunsListResponse, CoreBlockAttachRequest, + CoreBlockAttachResponse, CoreBlockDetachRequest, CoreBlockDetachResponse, + CoreBlockUpsertRequest, CoreBlockUpsertResponse, CoreBlocksGetRequest, CoreBlocksResponse, + DeleteRequest, DeleteResponse, DocType, DocsExcerptResponse, DocsExcerptsGetRequest, + DocsGetRequest, DocsGetResponse, DocsPutRequest, DocsPutResponse, DocsSearchL0Request, + DocsSearchL0Response, Error, EventMessage, GranteeKind, GraphQueryEntityRef, + GraphQueryPredicateRef, GraphQueryRequest, GraphQueryResponse, IngestionProfileSelector, + KnowledgePageGetRequest, KnowledgePageLintRequest, KnowledgePageLintResponse, + KnowledgePageRebuildRequest, KnowledgePageRebuildResponse, KnowledgePageResponse, + KnowledgePageSearchRequest, KnowledgePageSearchResponse, KnowledgePagesListRequest, + KnowledgePagesListResponse, ListRequest, ListResponse, MemoryHistoryGetRequest, + MemoryHistoryResponse, NoteFetchRequest, NoteFetchResponse, NoteProvenanceBundleResponse, + NoteProvenanceGetRequest, PayloadLevel, PublishNoteRequest, QueryPlan, RankingRequestOverride, + RebuildReport, SearchDetailsRequest, SearchDetailsResult, SearchExplainRequest, + SearchExplainResponse, SearchIndexItem, SearchRequest, SearchResponse, SearchSessionGetRequest, + SearchTimelineGroup, SearchTimelineRequest, SearchTrajectoryResponse, SearchTrajectorySummary, + ShareScope, SpaceGrantRevokeRequest, SpaceGrantRevokeResponse, SpaceGrantUpsertRequest, SpaceGrantsListRequest, TextPositionSelector, TextQuoteSelector, TraceBundleGetRequest, TraceBundleResponse, TraceGetRequest, TraceGetResponse, TraceRecentListRequest, TraceRecentListResponse, TraceTrajectoryGetRequest, UnpublishNoteRequest, UpdateRequest, @@ -112,6 +114,10 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); docs_get, docs_search_l0, docs_excerpts_get, + core_blocks_get, + admin_core_block_upsert, + admin_core_block_attach, + admin_core_block_detach, graph_query, searches_create, searches_get, @@ -218,6 +224,25 @@ struct DocsPutBody { content: String, } +#[derive(Clone, Debug, Deserialize)] +struct CoreBlockUpsertBody { + block_id: Option, + scope: String, + key: String, + title: String, + content: String, + #[serde(default)] + source_ref: Value, + reason: Option, +} + +#[derive(Clone, Debug, Deserialize)] +struct CoreBlockAttachBody { + target_agent_id: String, + read_profile: String, + reason: Option, +} + #[derive(Clone, Debug, Deserialize)] struct DocsSearchL0Body { query: String, @@ -612,6 +637,7 @@ pub fn router(state: AppState) -> Router { .route("/health", routing::get(health)) .route("/v2/notes/ingest", routing::post(notes_ingest)) .route("/v2/events/ingest", routing::post(events_ingest)) + .route("/v2/core-blocks", routing::get(core_blocks_get)) .route("/v2/searches", routing::post(searches_create)) .route("/v2/searches/{search_id}", routing::get(searches_get)) .route("/v2/searches/{search_id}/timeline", routing::get(searches_timeline)) @@ -654,6 +680,15 @@ pub fn admin_router(state: AppState) -> Router { .route("/v2/admin/searches/{search_id}", routing::get(searches_get)) .route("/v2/admin/searches/{search_id}/timeline", routing::get(searches_timeline)) .route("/v2/admin/searches/{search_id}/notes", routing::post(searches_notes)) + .route("/v2/admin/core-blocks", routing::post(admin_core_block_upsert)) + .route( + "/v2/admin/core-blocks/{block_id}/attachments", + routing::post(admin_core_block_attach), + ) + .route( + "/v2/admin/core-blocks/attachments/{attachment_id}", + routing::delete(admin_core_block_detach), + ) .route("/v2/admin/notes", routing::get(notes_list)) .route("/v2/admin/notes/{note_id}", routing::get(notes_get)) .route( @@ -1360,6 +1395,165 @@ async fn docs_put( Ok(Json(response)) } +#[utoipa::path( + get, + path = "/v2/core-blocks", + tag = "core_blocks", + responses( + (status = 200, description = "Attached core memory blocks.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Scope denied.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn core_blocks_get( + State(state): State, + headers: HeaderMap, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let read_profile = required_read_profile(&headers)?; + let response = state + .service + .core_blocks_get(CoreBlocksGetRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + read_profile, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + post, + path = "/v2/admin/core-blocks", + tag = "core_blocks", + request_body = Value, + responses( + (status = 200, description = "Core block was stored.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Scope denied.", body = ErrorBody), + (status = 409, description = "Core block conflict.", body = ErrorBody), + (status = 422, description = "Non-English input rejected.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn admin_core_block_upsert( + State(state): State, + headers: HeaderMap, + role: Option>, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let role = role.map(|Extension(role)| role); + + if payload.scope.trim() == "org_shared" { + require_admin_for_org_shared_writes(state.service.cfg.security.auth_mode.as_str(), role)?; + } + + let response = state + .service + .core_block_upsert(CoreBlockUpsertRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + block_id: payload.block_id, + scope: payload.scope, + key: payload.key, + title: payload.title, + content: payload.content, + source_ref: payload.source_ref, + reason: payload.reason, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + post, + path = "/v2/admin/core-blocks/{block_id}/attachments", + tag = "core_blocks", + params(("block_id" = Uuid, Path, description = "Core block ID.")), + request_body = Value, + responses( + (status = 200, description = "Core block was attached.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Scope denied.", body = ErrorBody), + (status = 404, description = "Core block was not found.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn admin_core_block_attach( + State(state): State, + headers: HeaderMap, + Path(block_id): Path, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let response = state + .service + .core_block_attach(CoreBlockAttachRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + block_id, + target_agent_id: payload.target_agent_id, + read_profile: payload.read_profile, + reason: payload.reason, + }) + .await?; + + Ok(Json(response)) +} + +#[utoipa::path( + delete, + path = "/v2/admin/core-blocks/attachments/{attachment_id}", + tag = "core_blocks", + params(("attachment_id" = Uuid, Path, description = "Core block attachment ID.")), + responses( + (status = 200, description = "Core block attachment was detached.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Scope denied.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn admin_core_block_detach( + State(state): State, + headers: HeaderMap, + Path(attachment_id): Path, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let response = state + .service + .core_block_detach(CoreBlockDetachRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + attachment_id, + reason: None, + }) + .await?; + + Ok(Json(response)) +} + #[utoipa::path( get, path = "/v2/docs/{doc_id}", diff --git a/apps/elf-api/tests/http.rs b/apps/elf-api/tests/http.rs index fe5a4d9d..a59acdba 100644 --- a/apps/elf-api/tests/http.rs +++ b/apps/elf-api/tests/http.rs @@ -264,6 +264,24 @@ fn init_test_tracing() { let _ = tracing_subscriber::fmt().with_max_level(Level::ERROR).with_test_writer().try_init(); } +fn context_request( + method: &str, + uri: impl AsRef, + agent_id: &str, + read_profile: &str, +) -> Request { + Request::builder() + .method(method) + .uri(uri.as_ref()) + .header("content-type", "application/json") + .header("X-ELF-Tenant-Id", TEST_TENANT_ID) + .header("X-ELF-Project-Id", TEST_PROJECT_ID) + .header("X-ELF-Agent-Id", agent_id) + .header("X-ELF-Read-Profile", read_profile) + .body(Body::empty()) + .expect("Failed to build context request.") +} + async fn test_env() -> Option<(TestDatabase, String, String)> { let base_dsn = match elf_testkit::env_dsn() { Some(value) => value, @@ -364,6 +382,93 @@ async fn insert_project_scope_grant( .expect("Failed to seed project scope grant."); } +async fn search_session_count(state: &AppState) -> i64 { + sqlx::query_scalar("SELECT COUNT(*) FROM search_sessions") + .fetch_one(&state.service.db.pool) + .await + .expect("Failed to count search sessions.") +} + +async fn post_admin_json( + app: &Router, + uri: impl AsRef, + agent_id: &str, + body: serde_json::Value, +) -> (StatusCode, serde_json::Value) { + let request = Request::builder() + .method("POST") + .uri(uri.as_ref()) + .header("content-type", "application/json") + .header("X-ELF-Tenant-Id", TEST_TENANT_ID) + .header("X-ELF-Project-Id", TEST_PROJECT_ID) + .header("X-ELF-Agent-Id", agent_id) + .body(Body::from(body.to_string())) + .expect("Failed to build admin JSON request."); + let response = app.clone().oneshot(request).await.expect("Failed to call admin route."); + let status = response.status(); + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("Failed to read admin response body."); + + (status, serde_json::from_slice(&body).expect("Failed to parse admin response.")) +} + +async fn create_core_block(admin_app: &Router, scope: &str, key: &str, content: &str) -> Uuid { + let payload = serde_json::json!({ + "scope": scope, + "key": key, + "title": "Operating context", + "content": content, + "source_ref": { + "schema": "core_block_source/v1", + "ref": { "issue": "XY-832" } + } + }); + let (status, body) = + post_admin_json(admin_app, "/v2/admin/core-blocks", TEST_AGENT_A, payload).await; + + assert_eq!(status, StatusCode::OK); + + Uuid::parse_str( + body.pointer("/block/block_id") + .and_then(serde_json::Value::as_str) + .expect("Missing core block id."), + ) + .expect("Invalid core block id.") +} + +async fn attach_core_block( + admin_app: &Router, + block_id: Uuid, + target_agent_id: &str, + read_profile: &str, +) -> (StatusCode, serde_json::Value) { + let payload = serde_json::json!({ + "target_agent_id": target_agent_id, + "read_profile": read_profile, + "reason": "Attach fixture block." + }); + let uri = format!("/v2/admin/core-blocks/{block_id}/attachments"); + + post_admin_json(admin_app, uri, TEST_AGENT_A, payload).await +} + +async fn get_core_blocks(app: &Router, agent_id: &str, read_profile: &str) -> serde_json::Value { + let response = app + .clone() + .oneshot(context_request("GET", "/v2/core-blocks", agent_id, read_profile)) + .await + .expect("Failed to fetch core blocks."); + + assert_eq!(response.status(), StatusCode::OK); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("Failed to read core blocks response body."); + + serde_json::from_slice(&body).expect("Failed to parse core blocks response.") +} + async fn active_project_grant_count(state: &AppState, owner_agent_id: &str) -> i64 { sqlx::query_scalar( "SELECT COUNT(*) FROM memory_space_grants \ @@ -839,8 +944,12 @@ async fn openapi_json_route_serves_generated_contract() { assert_openapi_method(&spec, "/health", "get"); assert_openapi_method(&spec, "/v2/notes/ingest", "post"); assert_openapi_method(&spec, "/v2/events/ingest", "post"); + assert_openapi_method(&spec, "/v2/core-blocks", "get"); assert_openapi_method(&spec, "/v2/docs/search/l0", "post"); assert_openapi_method(&spec, "/v2/searches/{search_id}/notes", "post"); + assert_openapi_method(&spec, "/v2/admin/core-blocks", "post"); + assert_openapi_method(&spec, "/v2/admin/core-blocks/{block_id}/attachments", "post"); + assert_openapi_method(&spec, "/v2/admin/core-blocks/attachments/{attachment_id}", "delete"); assert_openapi_method(&spec, "/v2/admin/searches/raw", "post"); assert_openapi_method(&spec, "/v2/admin/events/ingestion-profiles/default", "get"); assert_openapi_method(&spec, "/v2/admin/events/ingestion-profiles/default", "put"); @@ -988,6 +1097,83 @@ async fn sharing_visibility_requires_explicit_project_grant() { test_db.cleanup().await.expect("Failed to cleanup test database."); } +#[tokio::test] +#[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_GRPC_URL (or ELF_QDRANT_URL) to run."] +async fn core_blocks_are_explicitly_attached_and_separate_from_archival_search() { + let Some((test_db, qdrant_url, collection)) = test_env().await else { + return; + }; + let config = test_config(test_db.dsn().to_string(), qdrant_url, collection); + let state = AppState::new(config).await.expect("Failed to initialize app state."); + let app = routes::router(state.clone()); + let admin_app = routes::admin_router(state.clone()); + let private_block_id = create_core_block( + &admin_app, + "agent_private", + "private_operating_context", + "Preference: Keep core context separate from archival search.", + ) + .await; + let note_id = Uuid::new_v4(); + + insert_note( + &state, + note_id, + "agent_private", + TEST_AGENT_A, + "Fact: This archival note must not appear in attached core blocks.", + ) + .await; + + let (status, _) = + attach_core_block(&admin_app, private_block_id, TEST_AGENT_A, "private_only").await; + let before_sessions = search_session_count(&state).await; + let blocks = get_core_blocks(&app, TEST_AGENT_A, "private_only").await; + let after_sessions = search_session_count(&state).await; + + assert_eq!(status, StatusCode::OK); + assert_eq!(before_sessions, after_sessions); + assert_eq!(blocks["schema"], "elf.core_memory_blocks/v1"); + assert_eq!(blocks["items"].as_array().expect("items array").len(), 1); + assert_eq!( + blocks["items"][0]["content"], + "Preference: Keep core context separate from archival search." + ); + assert_eq!(blocks["items"][0]["source_ref"]["schema"], "core_block_source/v1"); + assert!(blocks["items"][0]["audit_history"].as_array().expect("audit history").len() >= 2); + assert!(!blocks.to_string().contains("archival note must not appear")); + + let b_private = get_core_blocks(&app, TEST_AGENT_B, "private_only").await; + + assert_eq!(b_private["items"].as_array().expect("items array").len(), 0); + + let shared_block_id = create_core_block( + &admin_app, + "project_shared", + "shared_operating_context", + "Constraint: Shared core context requires explicit project grant and attachment.", + ) + .await; + let (denied_status, _) = + attach_core_block(&admin_app, shared_block_id, TEST_AGENT_B, "private_plus_project").await; + + assert_eq!(denied_status, StatusCode::FORBIDDEN); + + insert_project_scope_grant(&state, TEST_AGENT_A, TEST_AGENT_A).await; + + let (shared_status, _) = + attach_core_block(&admin_app, shared_block_id, TEST_AGENT_B, "private_plus_project").await; + let b_shared = get_core_blocks(&app, TEST_AGENT_B, "private_plus_project").await; + let b_wrong_profile = get_core_blocks(&app, TEST_AGENT_B, "private_only").await; + + assert_eq!(shared_status, StatusCode::OK); + assert_eq!(b_shared["items"].as_array().expect("items array").len(), 1); + assert_eq!(b_shared["items"][0]["scope"], "project_shared"); + assert_eq!(b_wrong_profile["items"].as_array().expect("items array").len(), 0); + + test_db.cleanup().await.expect("Failed to cleanup test database."); +} + #[tokio::test] #[ignore = "Requires external Postgres and Qdrant. Set ELF_PG_DSN and ELF_QDRANT_GRPC_URL (or ELF_QDRANT_URL) to run."] async fn org_shared_note_is_visible_across_projects() { diff --git a/apps/elf-mcp/src/server.rs b/apps/elf-mcp/src/server.rs index 2d67b4b8..d7c60891 100644 --- a/apps/elf-mcp/src/server.rs +++ b/apps/elf-mcp/src/server.rs @@ -322,6 +322,21 @@ impl ElfMcp { self.forward(HttpMethod::Post, "/v2/docs/excerpts", params, None).await } + #[rmcp::tool( + name = "elf_core_blocks_get", + description = "Fetch core memory blocks explicitly attached to the configured agent and read profile. This is separate from archival search.", + input_schema = core_blocks_get_schema() + )] + async fn elf_core_blocks_get( + &self, + mut params: JsonObject, + ) -> Result { + // read_profile is part of the MCP server configuration and is not client-controlled. + let _ = take_optional_string(&mut params, "read_profile")?; + + self.forward(HttpMethod::Get, "/v2/core-blocks", params, None).await + } + #[rmcp::tool( name = "elf_searches_create", description = "Create a search session using quick-find or planned-search mode. Response includes optional trajectory_summary for staged retrieval progress.", @@ -1172,6 +1187,16 @@ fn docs_excerpts_get_schema() -> Arc { })) } +fn core_blocks_get_schema() -> Arc { + Arc::new(rmcp::object!({ + "type": "object", + "additionalProperties": true, + "properties": { + "read_profile": { "type": ["string", "null"] } + } + })) +} + fn searches_create_schema() -> Arc { let filter_schema = rmcp::object!({ "type": "object", @@ -1551,7 +1576,7 @@ mod tests { type RequestRecorder = Arc>>>; - const ALL_TOOL_DEFINITIONS: [ToolDefinition; 29] = [ + const ALL_TOOL_DEFINITIONS: [ToolDefinition; 30] = [ ToolDefinition::new( "elf_notes_ingest", HttpMethod::Post, @@ -1576,6 +1601,12 @@ mod tests { "/v2/searches", "Create a search session using quick-find or planned-search mode. Response includes optional trajectory_summary.", ), + ToolDefinition::new( + "elf_core_blocks_get", + HttpMethod::Get, + "/v2/core-blocks", + "Fetch core memory blocks explicitly attached to the configured agent and read profile.", + ), ToolDefinition::new( "elf_searches_get", HttpMethod::Get, @@ -1765,6 +1796,7 @@ mod tests { "elf_notes_ingest", "elf_graph_query", "elf_events_ingest", + "elf_core_blocks_get", "elf_searches_create", "elf_searches_get", "elf_searches_timeline", diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index ad86d61b..1d19df90 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -41,6 +41,7 @@ Optional future work: ============================================================ I1. Postgres with pgvector is the only source of truth for: - memory notes + - scoped core memory blocks and attachments - chunk embedding vectors - chunk metadata - pooled note embeddings (derived) @@ -630,6 +631,80 @@ details must include: - min_importance - write_policy_audits (add_note: single object, add_event: array of message audits, optional) +5.15 core_memory_blocks (authoritative always-attached context blocks) +- block_id uuid primary key +- tenant_id text not null +- project_id text not null +- agent_id text not null +- scope text not null +- key text not null +- title text not null +- content text not null +- source_ref jsonb not null +- status text not null +- created_at timestamptz not null +- updated_at timestamptz not null + +Rules: +- Core blocks are small read-only operating context, separate from archival note search. +- Core blocks must not be indexed into Qdrant or returned by archival search unless a future explicit contract says so. +- source_ref must be a JSON object and is returned with block readback. +- scope, write permission, English gate, auth, and shared-grant rules apply. + +Indexes: +- uq_core_memory_blocks_active_key: (tenant_id, project_id, agent_id, scope, key) WHERE status = 'active' +- idx_core_memory_blocks_scope_status: (tenant_id, project_id, scope, status) + +5.16 core_memory_block_attachments (explicit block attachment) +- attachment_id uuid primary key +- block_id uuid not null references core_memory_blocks(block_id) on delete cascade +- tenant_id text not null +- project_id text not null +- agent_id text not null +- read_profile text not null +- attached_by_agent_id text not null +- attached_at timestamptz not null +- detached_by_agent_id text null +- detached_at timestamptz null + +Rules: +- Active attachment is exact to tenant_id, project_id, agent_id, read_profile, and block_id. +- Attachment does not bypass scope access. Readback still applies read_profile scope resolution, + private-owner checks, shared grants, and block status. +- Detached rows remain as audit evidence. + +Indexes: +- uq_core_memory_block_attachments_active: + (tenant_id, project_id, agent_id, read_profile, block_id) WHERE detached_at IS NULL +- idx_core_memory_block_attachments_read: + (tenant_id, project_id, agent_id, read_profile, detached_at) +- idx_core_memory_block_attachments_block: (block_id, detached_at) + +5.17 core_memory_block_events (append-only block audit) +- event_id uuid primary key +- block_id uuid not null references core_memory_blocks(block_id) on delete cascade +- attachment_id uuid null references core_memory_block_attachments(attachment_id) on delete set null +- tenant_id text not null +- project_id text not null +- actor_agent_id text not null +- event_type text not null +- target_agent_id text null +- read_profile text null +- prev_snapshot jsonb null +- new_snapshot jsonb null +- reason text not null +- ts timestamptz not null + +event_type values: +- block_created +- block_updated +- attachment_added +- attachment_removed + +Rules: +- Every block create/update and attachment add/remove writes one event. +- Block readback may include audit history for returned blocks. + ============================================================ 6. QDRANT COLLECTION (DERIVED INDEX ONLY) ============================================================ @@ -982,6 +1057,17 @@ Behavior: - These endpoints mirror the public note list/detail reads for local admin viewer use. - Note metadata that includes `created_at`, `hit_count`, and `last_hit_at` is available through `GET /v2/admin/notes/{note_id}/provenance`. +Admin core memory block management: +- POST /v2/admin/core-blocks +- POST /v2/admin/core-blocks/{block_id}/attachments +- DELETE /v2/admin/core-blocks/attachments/{attachment_id} + +Behavior: +- These endpoints create/update core blocks and attach/detach them for exact tenant/project/agent/read_profile readback. +- Core blocks are read-only to normal public callers; public callers only read attached blocks. +- Mutations write append-only `core_memory_block_events`. +- Core blocks are not note-search hits and do not write Qdrant points, search sessions, search traces, or note outbox rows. + Admin consolidation proposal review: - POST /v2/admin/consolidation/runs - GET /v2/admin/consolidation/runs @@ -1787,6 +1873,47 @@ Notes: - `evidence_note_ids` is ordered by evidence creation time and capped to 16 IDs per fact. - `explain` defaults to false; when true, response includes `explain.schema = "elf.graph_query/v1"`. +GET /v2/core-blocks + +Headers: +- X-ELF-Tenant-Id, X-ELF-Project-Id, X-ELF-Agent-Id +- X-ELF-Read-Profile + +Response: +{ + "schema": "elf.core_memory_blocks/v1", + "tenant_id": "string", + "project_id": "string", + "agent_id": "string", + "read_profile": "private_only|private_plus_project|all_scopes", + "items": [ + { + "block_id": "uuid", + "attachment_id": "uuid", + "tenant_id": "string", + "project_id": "string", + "agent_id": "block-owner-agent", + "scope": "agent_private|project_shared|org_shared", + "key": "string", + "title": "string", + "content": "small English operating context", + "source_ref": { ... }, + "status": "active", + "updated_at": "...", + "attached_at": "...", + "attached_by_agent_id": "string", + "audit_history": [ ... ] + } + ] +} + +Notes: +- This endpoint is not archival search. It does not embed, rerank, search Qdrant, + create a search session, or record note hits. +- A block is returned only when it has an active attachment for the exact + tenant/project/agent/read_profile and the block is readable under that read_profile's + scopes and shared grants. + POST /v2/searches Headers: @@ -2120,6 +2247,7 @@ Original query: - Tools map 1:1 to v2 endpoints: - elf_notes_ingest -> POST /v2/notes/ingest - elf_events_ingest -> POST /v2/events/ingest + - elf_core_blocks_get -> GET /v2/core-blocks - elf_graph_query -> POST /v2/graph/query - elf_searches_create -> POST /v2/searches - elf_searches_get -> GET /v2/searches/{search_id} diff --git a/packages/elf-service/src/core_blocks.rs b/packages/elf-service/src/core_blocks.rs new file mode 100644 index 00000000..3ff42bf9 --- /dev/null +++ b/packages/elf-service/src/core_blocks.rs @@ -0,0 +1,1230 @@ +//! Scoped core memory block APIs. + +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sqlx::{FromRow, PgExecutor, Postgres, Transaction}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + ElfService, Error, Result, + access::{self, ORG_PROJECT_ID}, + search, +}; +use elf_config::Config; +use elf_domain::english_gate::{self, EnglishGateKind}; + +/// Core memory blocks response schema identifier. +pub const ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1: &str = "elf.core_memory_blocks/v1"; + +const MAX_CORE_BLOCK_CONTENT_CHARS: usize = 2_000; + +/// Request payload for attached core block readback. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlocksGetRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for attachment lookup. + pub project_id: String, + /// Agent requesting attached blocks. + pub agent_id: String, + /// Read profile whose exact attachments should be returned. + pub read_profile: String, +} + +/// Response payload for attached core block readback. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlocksResponse { + /// Response schema identifier. + pub schema: String, + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for attachment lookup. + pub project_id: String, + /// Agent requesting attached blocks. + pub agent_id: String, + /// Read profile used for attachment lookup. + pub read_profile: String, + /// Attached core blocks visible to the caller. + pub items: Vec, +} + +/// One attached core memory block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockItem { + /// Core block identifier. + pub block_id: Uuid, + /// Active attachment identifier that made the block visible. + pub attachment_id: Uuid, + /// Tenant that owns the block. + pub tenant_id: String, + /// Project that owns the block. + pub project_id: String, + /// Agent that owns the block's scope. + pub agent_id: String, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Lifecycle status for the block. + pub status: String, + #[serde(with = "crate::time_serde")] + /// Last block update timestamp. + pub updated_at: OffsetDateTime, + #[serde(with = "crate::time_serde")] + /// Attachment creation timestamp. + pub attached_at: OffsetDateTime, + /// Agent that created the attachment. + pub attached_by_agent_id: String, + /// Append-only block and attachment audit events. + pub audit_history: Vec, +} + +/// One core block audit event. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAuditEvent { + /// Audit event identifier. + pub event_id: Uuid, + /// Block identifier affected by the event. + pub block_id: Uuid, + /// Attachment identifier affected by the event, when applicable. + pub attachment_id: Option, + /// Agent that performed the event. + pub actor_agent_id: String, + /// Event type. + pub event_type: String, + /// Attachment target agent, when applicable. + pub target_agent_id: Option, + /// Attachment read profile, when applicable. + pub read_profile: Option, + /// Optional previous state snapshot. + pub prev_snapshot: Option, + /// Optional new state snapshot. + pub new_snapshot: Option, + /// Human-readable event reason. + pub reason: String, + #[serde(with = "crate::time_serde")] + /// Event timestamp. + pub ts: OffsetDateTime, +} + +/// Request payload for creating or updating a core block through admin APIs. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockUpsertRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the block. + pub project_id: String, + /// Agent creating or updating the block. + pub agent_id: String, + /// Existing block id to update. Omit to create. + pub block_id: Option, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Optional audit reason. + pub reason: Option, +} + +/// Response payload for core block creation or update. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockUpsertResponse { + /// Stored block record. + pub block: CoreBlockRecord, +} + +/// Core block record returned by admin mutation APIs. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockRecord { + /// Core block identifier. + pub block_id: Uuid, + /// Tenant that owns the block. + pub tenant_id: String, + /// Project that owns the block. + pub project_id: String, + /// Agent that owns the block's scope. + pub agent_id: String, + /// Scope key for the block. + pub scope: String, + /// Stable block key. + pub key: String, + /// Human-readable block title. + pub title: String, + /// Small always-attached context payload. + pub content: String, + /// Structured source/provenance metadata for the block. + pub source_ref: Value, + /// Lifecycle status for the block. + pub status: String, + #[serde(with = "crate::time_serde")] + /// Creation timestamp. + pub created_at: OffsetDateTime, + #[serde(with = "crate::time_serde")] + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} + +/// Request payload for attaching a block to an agent/read-profile pair. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAttachRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the attachment. + pub project_id: String, + /// Agent creating the attachment. + pub agent_id: String, + /// Block to attach. + pub block_id: Uuid, + /// Target agent that should receive the block. + pub target_agent_id: String, + /// Exact read profile for the attachment. + pub read_profile: String, + /// Optional audit reason. + pub reason: Option, +} + +/// Response payload for attaching a core block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockAttachResponse { + /// Attachment identifier. + pub attachment_id: Uuid, + /// Block identifier. + pub block_id: Uuid, + /// Target agent for the attachment. + pub target_agent_id: String, + /// Exact read profile for the attachment. + pub read_profile: String, + /// Agent that created the attachment. + pub attached_by_agent_id: String, + #[serde(with = "crate::time_serde")] + /// Attachment timestamp. + pub attached_at: OffsetDateTime, +} + +/// Request payload for detaching a block attachment. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockDetachRequest { + /// Tenant that owns the request. + pub tenant_id: String, + /// Project context for the attachment. + pub project_id: String, + /// Agent detaching the block. + pub agent_id: String, + /// Attachment to detach. + pub attachment_id: Uuid, + /// Optional audit reason. + pub reason: Option, +} + +/// Response payload for detaching a core block. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct CoreBlockDetachResponse { + /// Attachment identifier. + pub attachment_id: Uuid, + /// Whether an active attachment was detached. + pub detached: bool, +} + +#[derive(Clone, Debug, FromRow)] +struct CoreBlockRow { + block_id: Uuid, + tenant_id: String, + project_id: String, + agent_id: String, + scope: String, + key: String, + title: String, + content: String, + source_ref: Value, + status: String, + created_at: OffsetDateTime, + updated_at: OffsetDateTime, +} +impl CoreBlockRow { + fn into_record(self) -> CoreBlockRecord { + CoreBlockRecord { + block_id: self.block_id, + tenant_id: self.tenant_id, + project_id: self.project_id, + agent_id: self.agent_id, + scope: self.scope, + key: self.key, + title: self.title, + content: self.content, + source_ref: self.source_ref, + status: self.status, + created_at: self.created_at, + updated_at: self.updated_at, + } + } +} + +#[derive(Clone, Debug, FromRow)] +struct CoreBlockAttachmentRow { + attachment_id: Uuid, + block_id: Uuid, + tenant_id: String, + project_id: String, + agent_id: String, + read_profile: String, + attached_by_agent_id: String, + attached_at: OffsetDateTime, + detached_by_agent_id: Option, + detached_at: Option, +} + +#[derive(Clone, Debug, FromRow)] +struct CoreBlockJoinedRow { + attachment_id: Uuid, + attachment_agent_id: String, + attached_by_agent_id: String, + attached_at: OffsetDateTime, + block_id: Uuid, + tenant_id: String, + project_id: String, + agent_id: String, + scope: String, + key: String, + title: String, + content: String, + source_ref: Value, + status: String, + created_at: OffsetDateTime, + updated_at: OffsetDateTime, +} +impl CoreBlockJoinedRow { + fn into_item(self, audit_by_block: &HashMap>) -> CoreBlockItem { + let audit_history = audit_by_block.get(&self.block_id).cloned().unwrap_or_else(Vec::new); + + CoreBlockItem { + block_id: self.block_id, + attachment_id: self.attachment_id, + tenant_id: self.tenant_id, + project_id: self.project_id, + agent_id: self.agent_id, + scope: self.scope, + key: self.key, + title: self.title, + content: self.content, + source_ref: self.source_ref, + status: self.status, + updated_at: self.updated_at, + attached_at: self.attached_at, + attached_by_agent_id: self.attached_by_agent_id, + audit_history, + } + } +} + +#[derive(Clone, Debug, FromRow)] +struct CoreBlockEventRow { + event_id: Uuid, + block_id: Uuid, + attachment_id: Option, + actor_agent_id: String, + event_type: String, + target_agent_id: Option, + read_profile: Option, + prev_snapshot: Option, + new_snapshot: Option, + reason: String, + ts: OffsetDateTime, +} + +struct PreparedGetRequest { + tenant_id: String, + project_id: String, + agent_id: String, + read_profile: String, + allowed_scopes: Vec, +} + +struct PreparedUpsertRequest { + tenant_id: String, + project_id: String, + agent_id: String, + block_id: Option, + scope: String, + key: String, + title: String, + content: String, + source_ref: Value, + reason: String, +} + +struct PreparedAttachRequest { + tenant_id: String, + project_id: String, + agent_id: String, + block_id: Uuid, + target_agent_id: String, + read_profile: String, + allowed_scopes: Vec, + reason: String, +} + +struct PreparedDetachRequest { + tenant_id: String, + project_id: String, + agent_id: String, + attachment_id: Uuid, + reason: String, +} + +struct CoreBlockEventInput<'a> { + block_id: Uuid, + attachment_id: Option, + tenant_id: &'a str, + project_id: &'a str, + actor_agent_id: &'a str, + event_type: &'a str, + target_agent_id: Option<&'a str>, + read_profile: Option<&'a str>, + prev_snapshot: Option, + new_snapshot: Option, + reason: &'a str, + ts: OffsetDateTime, +} + +impl ElfService { + /// Returns core memory blocks explicitly attached for one agent/read-profile pair. + pub async fn core_blocks_get(&self, req: CoreBlocksGetRequest) -> Result { + let prepared = prepare_get_request(&self.cfg, req)?; + let rows = fetch_attached_block_rows( + &self.db.pool, + prepared.tenant_id.as_str(), + prepared.project_id.as_str(), + prepared.agent_id.as_str(), + prepared.read_profile.as_str(), + ) + .await?; + let shared_grants = access::load_shared_read_grants_with_org_shared( + &self.db.pool, + prepared.tenant_id.as_str(), + prepared.project_id.as_str(), + prepared.agent_id.as_str(), + prepared.allowed_scopes.iter().any(|scope| scope == "org_shared"), + ) + .await?; + let visible_rows = filter_visible_rows(rows, &prepared.allowed_scopes, &shared_grants); + let block_ids = visible_rows.iter().map(|row| row.block_id).collect::>(); + let audit_by_block = fetch_audit_history(&self.db.pool, &block_ids).await?; + let items = + visible_rows.into_iter().map(|row| row.into_item(&audit_by_block)).collect::>(); + + Ok(CoreBlocksResponse { + schema: ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1.to_string(), + tenant_id: prepared.tenant_id, + project_id: prepared.project_id, + agent_id: prepared.agent_id, + read_profile: prepared.read_profile, + items, + }) + } + + /// Creates or updates a core memory block and records append-only audit history. + pub async fn core_block_upsert( + &self, + req: CoreBlockUpsertRequest, + ) -> Result { + let prepared = prepare_upsert_request(&self.cfg, req)?; + let now = OffsetDateTime::now_utc(); + let mut tx = self.db.pool.begin().await?; + let (row, prev_snapshot) = match prepared.block_id { + Some(block_id) => update_core_block(&mut tx, &prepared, block_id, now).await?, + None => (insert_core_block(&mut tx, &prepared, now).await?, None), + }; + + insert_core_block_event( + &mut tx, + CoreBlockEventInput { + block_id: row.block_id, + attachment_id: None, + tenant_id: prepared.tenant_id.as_str(), + project_id: prepared.project_id.as_str(), + actor_agent_id: prepared.agent_id.as_str(), + event_type: if prepared.block_id.is_some() { + "block_updated" + } else { + "block_created" + }, + target_agent_id: None, + read_profile: None, + prev_snapshot, + new_snapshot: Some(block_snapshot(&row)), + reason: prepared.reason.as_str(), + ts: now, + }, + ) + .await?; + + tx.commit().await?; + + Ok(CoreBlockUpsertResponse { block: row.into_record() }) + } + + /// Attaches an active core block to one exact agent/read-profile pair. + pub async fn core_block_attach( + &self, + req: CoreBlockAttachRequest, + ) -> Result { + let prepared = prepare_attach_request(&self.cfg, req)?; + let now = OffsetDateTime::now_utc(); + let mut tx = self.db.pool.begin().await?; + let block = fetch_active_block_for_attachment(&mut tx, &prepared).await?; + let shared_grants = access::load_shared_read_grants_with_org_shared( + &mut *tx, + prepared.tenant_id.as_str(), + prepared.project_id.as_str(), + prepared.target_agent_id.as_str(), + prepared.allowed_scopes.iter().any(|scope| scope == "org_shared"), + ) + .await?; + + if !block_read_allowed( + &block, + prepared.target_agent_id.as_str(), + &prepared.allowed_scopes, + &shared_grants, + ) { + return Err(Error::ScopeDenied { + message: "Block scope is not allowed for this attachment.".to_string(), + }); + } + + let attachment = upsert_core_block_attachment(&mut tx, &prepared, now).await?; + + insert_core_block_event( + &mut tx, + CoreBlockEventInput { + block_id: attachment.block_id, + attachment_id: Some(attachment.attachment_id), + tenant_id: prepared.tenant_id.as_str(), + project_id: prepared.project_id.as_str(), + actor_agent_id: prepared.agent_id.as_str(), + event_type: "attachment_added", + target_agent_id: Some(prepared.target_agent_id.as_str()), + read_profile: Some(prepared.read_profile.as_str()), + prev_snapshot: None, + new_snapshot: Some(attachment_snapshot(&attachment)), + reason: prepared.reason.as_str(), + ts: now, + }, + ) + .await?; + + tx.commit().await?; + + Ok(CoreBlockAttachResponse { + attachment_id: attachment.attachment_id, + block_id: attachment.block_id, + target_agent_id: attachment.agent_id, + read_profile: attachment.read_profile, + attached_by_agent_id: attachment.attached_by_agent_id, + attached_at: attachment.attached_at, + }) + } + + /// Detaches an active core block attachment and records an audit event. + pub async fn core_block_detach( + &self, + req: CoreBlockDetachRequest, + ) -> Result { + let prepared = prepare_detach_request(req)?; + let now = OffsetDateTime::now_utc(); + let mut tx = self.db.pool.begin().await?; + let Some(prev) = fetch_active_attachment_for_update(&mut tx, &prepared).await? else { + tx.commit().await?; + + return Ok(CoreBlockDetachResponse { + attachment_id: prepared.attachment_id, + detached: false, + }); + }; + let updated = detach_core_block_attachment(&mut tx, &prepared, now).await?; + + insert_core_block_event( + &mut tx, + CoreBlockEventInput { + block_id: updated.block_id, + attachment_id: Some(updated.attachment_id), + tenant_id: prepared.tenant_id.as_str(), + project_id: prepared.project_id.as_str(), + actor_agent_id: prepared.agent_id.as_str(), + event_type: "attachment_removed", + target_agent_id: Some(updated.agent_id.as_str()), + read_profile: Some(updated.read_profile.as_str()), + prev_snapshot: Some(attachment_snapshot(&prev)), + new_snapshot: Some(attachment_snapshot(&updated)), + reason: prepared.reason.as_str(), + ts: now, + }, + ) + .await?; + + tx.commit().await?; + + Ok(CoreBlockDetachResponse { attachment_id: updated.attachment_id, detached: true }) + } +} + +fn prepare_get_request(cfg: &Config, req: CoreBlocksGetRequest) -> Result { + let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; + let read_profile = normalize_required(req.read_profile.as_str(), "read_profile")?; + let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; + + Ok(PreparedGetRequest { tenant_id, project_id, agent_id, read_profile, allowed_scopes }) +} + +fn prepare_upsert_request( + cfg: &Config, + req: CoreBlockUpsertRequest, +) -> Result { + let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let requested_project_id = normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; + let scope = normalize_required(req.scope.as_str(), "scope")?; + let key = normalize_required(req.key.as_str(), "key")?; + let title = normalize_required(req.title.as_str(), "title")?; + let content = normalize_required(req.content.as_str(), "content")?; + let reason = req + .reason + .as_deref() + .map(|value| normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block upsert".to_string()); + let project_id = + if scope == "org_shared" { ORG_PROJECT_ID.to_string() } else { requested_project_id }; + + validate_write_scope(cfg, scope.as_str())?; + validate_english(key.as_str(), EnglishGateKind::Identifier, "$.key")?; + validate_english(title.as_str(), EnglishGateKind::NaturalLanguage, "$.title")?; + validate_english(content.as_str(), EnglishGateKind::NaturalLanguage, "$.content")?; + validate_source_ref(&req.source_ref)?; + + if content.chars().count() > MAX_CORE_BLOCK_CONTENT_CHARS { + return Err(Error::InvalidRequest { message: "content is too long.".to_string() }); + } + + Ok(PreparedUpsertRequest { + tenant_id, + project_id, + agent_id, + block_id: req.block_id, + scope, + key, + title, + content, + source_ref: req.source_ref, + reason, + }) +} + +fn prepare_attach_request( + cfg: &Config, + req: CoreBlockAttachRequest, +) -> Result { + let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; + let target_agent_id = normalize_required(req.target_agent_id.as_str(), "target_agent_id")?; + let read_profile = normalize_required(req.read_profile.as_str(), "read_profile")?; + let allowed_scopes = search::resolve_read_profile_scopes(cfg, read_profile.as_str())?; + let reason = req + .reason + .as_deref() + .map(|value| normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block attachment".to_string()); + + validate_english(target_agent_id.as_str(), EnglishGateKind::Identifier, "$.target_agent_id")?; + + Ok(PreparedAttachRequest { + tenant_id, + project_id, + agent_id, + block_id: req.block_id, + target_agent_id, + read_profile, + allowed_scopes, + reason, + }) +} + +fn prepare_detach_request(req: CoreBlockDetachRequest) -> Result { + let tenant_id = normalize_required(req.tenant_id.as_str(), "tenant_id")?; + let project_id = normalize_required(req.project_id.as_str(), "project_id")?; + let agent_id = normalize_required(req.agent_id.as_str(), "agent_id")?; + let reason = req + .reason + .as_deref() + .map(|value| normalize_required(value, "reason")) + .transpose()? + .unwrap_or_else(|| "core block detach".to_string()); + + Ok(PreparedDetachRequest { + tenant_id, + project_id, + agent_id, + attachment_id: req.attachment_id, + reason, + }) +} + +fn filter_visible_rows( + rows: Vec, + allowed_scopes: &[String], + shared_grants: &HashSet, +) -> Vec { + rows.into_iter() + .filter(|row| { + let block = CoreBlockRow { + block_id: row.block_id, + tenant_id: row.tenant_id.clone(), + project_id: row.project_id.clone(), + agent_id: row.agent_id.clone(), + scope: row.scope.clone(), + key: row.key.clone(), + title: row.title.clone(), + content: row.content.clone(), + source_ref: row.source_ref.clone(), + status: row.status.clone(), + created_at: row.created_at, + updated_at: row.updated_at, + }; + + block_read_allowed( + &block, + row.attachment_agent_id.as_str(), + allowed_scopes, + shared_grants, + ) + }) + .collect() +} + +fn block_read_allowed( + block: &CoreBlockRow, + requester_agent_id: &str, + allowed_scopes: &[String], + shared_grants: &HashSet, +) -> bool { + if block.status != "active" { + return false; + } + if !allowed_scopes.iter().any(|scope| scope == &block.scope) { + return false; + } + if block.scope == "agent_private" { + return block.agent_id == requester_agent_id; + } + if !matches!(block.scope.as_str(), "project_shared" | "org_shared") { + return false; + } + if block.agent_id == requester_agent_id { + return true; + } + + shared_grants.contains(&access::SharedSpaceGrantKey { + scope: block.scope.clone(), + space_owner_agent_id: block.agent_id.clone(), + }) +} + +fn block_snapshot(block: &CoreBlockRow) -> Value { + serde_json::json!({ + "block_id": block.block_id, + "tenant_id": block.tenant_id, + "project_id": block.project_id, + "agent_id": block.agent_id, + "scope": block.scope, + "key": block.key, + "title": block.title, + "content": block.content, + "source_ref": block.source_ref, + "status": block.status, + "created_at": block.created_at, + "updated_at": block.updated_at, + }) +} + +fn attachment_snapshot(attachment: &CoreBlockAttachmentRow) -> Value { + serde_json::json!({ + "attachment_id": attachment.attachment_id, + "block_id": attachment.block_id, + "tenant_id": attachment.tenant_id, + "project_id": attachment.project_id, + "agent_id": attachment.agent_id, + "read_profile": attachment.read_profile, + "attached_by_agent_id": attachment.attached_by_agent_id, + "attached_at": attachment.attached_at, + "detached_by_agent_id": attachment.detached_by_agent_id, + "detached_at": attachment.detached_at, + }) +} + +fn normalize_required(raw: &str, field: &str) -> Result { + let trimmed = raw.trim(); + + if trimmed.is_empty() { + return Err(Error::InvalidRequest { message: format!("{field} is required.") }); + } + + Ok(trimmed.to_string()) +} + +fn validate_write_scope(cfg: &Config, scope: &str) -> Result<()> { + if !cfg.scopes.allowed.iter().any(|allowed| allowed == scope) { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + let write_allowed = match scope { + "agent_private" => cfg.scopes.write_allowed.agent_private, + "project_shared" => cfg.scopes.write_allowed.project_shared, + "org_shared" => cfg.scopes.write_allowed.org_shared, + _ => false, + }; + + if !write_allowed { + return Err(Error::ScopeDenied { message: "Scope is not allowed.".to_string() }); + } + + Ok(()) +} + +fn validate_english(input: &str, kind: EnglishGateKind, field: &str) -> Result<()> { + english_gate::english_gate(input, kind) + .map_err(|_| Error::NonEnglishInput { field: field.to_string() }) +} + +fn validate_source_ref(source_ref: &Value) -> Result<()> { + if !source_ref.is_object() { + return Err(Error::InvalidRequest { + message: "source_ref must be a JSON object.".to_string(), + }); + } + + Ok(()) +} + +async fn insert_core_block( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedUpsertRequest, + now: OffsetDateTime, +) -> Result { + ensure_no_active_key_conflict(tx, req, None).await?; + + sqlx::query_as::<_, CoreBlockRow>( + "\ +INSERT INTO core_memory_blocks ( + block_id, + tenant_id, + project_id, + agent_id, + scope, + key, + title, + content, + source_ref, + status, + created_at, + updated_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'active', $10, $10) +RETURNING *", + ) + .bind(Uuid::new_v4()) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.agent_id.as_str()) + .bind(req.scope.as_str()) + .bind(req.key.as_str()) + .bind(req.title.as_str()) + .bind(req.content.as_str()) + .bind(&req.source_ref) + .bind(now) + .fetch_one(&mut **tx) + .await + .map_err(Into::into) +} + +async fn update_core_block( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedUpsertRequest, + block_id: Uuid, + now: OffsetDateTime, +) -> Result<(CoreBlockRow, Option)> { + let prev = fetch_owned_block_for_update(tx, req, block_id).await?; + let prev_snapshot = Some(block_snapshot(&prev)); + + ensure_no_active_key_conflict(tx, req, Some(block_id)).await?; + + let row = sqlx::query_as::<_, CoreBlockRow>( + "\ +UPDATE core_memory_blocks +SET + key = $6, + title = $7, + content = $8, + source_ref = $9, + updated_at = $10 +WHERE block_id = $1 + AND tenant_id = $2 + AND project_id = $3 + AND agent_id = $4 + AND scope = $5 + AND status = 'active' +RETURNING *", + ) + .bind(block_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.agent_id.as_str()) + .bind(req.scope.as_str()) + .bind(req.key.as_str()) + .bind(req.title.as_str()) + .bind(req.content.as_str()) + .bind(&req.source_ref) + .bind(now) + .fetch_optional(&mut **tx) + .await? + .ok_or_else(|| Error::NotFound { message: "Core block not found.".to_string() })?; + + Ok((row, prev_snapshot)) +} + +async fn fetch_owned_block_for_update( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedUpsertRequest, + block_id: Uuid, +) -> Result { + sqlx::query_as::<_, CoreBlockRow>( + "\ +SELECT * +FROM core_memory_blocks +WHERE block_id = $1 + AND tenant_id = $2 + AND project_id = $3 + AND agent_id = $4 + AND scope = $5 + AND status = 'active' +FOR UPDATE", + ) + .bind(block_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.agent_id.as_str()) + .bind(req.scope.as_str()) + .fetch_optional(&mut **tx) + .await? + .ok_or_else(|| Error::NotFound { message: "Core block not found.".to_string() }) +} + +async fn ensure_no_active_key_conflict( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedUpsertRequest, + block_id: Option, +) -> Result<()> { + let conflict: Option = sqlx::query_scalar( + "\ +SELECT block_id +FROM core_memory_blocks +WHERE tenant_id = $1 + AND project_id = $2 + AND agent_id = $3 + AND scope = $4 + AND key = $5 + AND status = 'active' + AND ($6::uuid IS NULL OR block_id <> $6) +LIMIT 1", + ) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.agent_id.as_str()) + .bind(req.scope.as_str()) + .bind(req.key.as_str()) + .bind(block_id) + .fetch_optional(&mut **tx) + .await?; + + if conflict.is_some() { + return Err(Error::Conflict { message: "Core block key already exists.".to_string() }); + } + + Ok(()) +} + +async fn fetch_active_block_for_attachment( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedAttachRequest, +) -> Result { + sqlx::query_as::<_, CoreBlockRow>( + "\ +SELECT * +FROM core_memory_blocks +WHERE block_id = $1 + AND tenant_id = $2 + AND status = 'active' + AND ( + project_id = $3 + OR (project_id = $4 AND scope = 'org_shared') + )", + ) + .bind(req.block_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(ORG_PROJECT_ID) + .fetch_optional(&mut **tx) + .await? + .ok_or_else(|| Error::NotFound { message: "Core block not found.".to_string() }) +} + +async fn upsert_core_block_attachment( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedAttachRequest, + now: OffsetDateTime, +) -> Result { + sqlx::query_as::<_, CoreBlockAttachmentRow>( + "\ +INSERT INTO core_memory_block_attachments ( + attachment_id, + block_id, + tenant_id, + project_id, + agent_id, + read_profile, + attached_by_agent_id, + attached_at +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +ON CONFLICT (tenant_id, project_id, agent_id, read_profile, block_id) +WHERE detached_at IS NULL +DO UPDATE +SET + attached_by_agent_id = EXCLUDED.attached_by_agent_id, + attached_at = EXCLUDED.attached_at, + detached_by_agent_id = NULL, + detached_at = NULL +RETURNING *", + ) + .bind(Uuid::new_v4()) + .bind(req.block_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.target_agent_id.as_str()) + .bind(req.read_profile.as_str()) + .bind(req.agent_id.as_str()) + .bind(now) + .fetch_one(&mut **tx) + .await + .map_err(Into::into) +} + +async fn fetch_active_attachment_for_update( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedDetachRequest, +) -> Result> { + sqlx::query_as::<_, CoreBlockAttachmentRow>( + "\ +SELECT * +FROM core_memory_block_attachments +WHERE attachment_id = $1 + AND tenant_id = $2 + AND project_id = $3 + AND detached_at IS NULL +FOR UPDATE", + ) + .bind(req.attachment_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .fetch_optional(&mut **tx) + .await + .map_err(Into::into) +} + +async fn detach_core_block_attachment( + tx: &mut Transaction<'_, Postgres>, + req: &PreparedDetachRequest, + now: OffsetDateTime, +) -> Result { + sqlx::query_as::<_, CoreBlockAttachmentRow>( + "\ +UPDATE core_memory_block_attachments +SET + detached_by_agent_id = $4, + detached_at = $5 +WHERE attachment_id = $1 + AND tenant_id = $2 + AND project_id = $3 + AND detached_at IS NULL +RETURNING *", + ) + .bind(req.attachment_id) + .bind(req.tenant_id.as_str()) + .bind(req.project_id.as_str()) + .bind(req.agent_id.as_str()) + .bind(now) + .fetch_one(&mut **tx) + .await + .map_err(Into::into) +} + +async fn fetch_attached_block_rows<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + agent_id: &str, + read_profile: &str, +) -> Result> +where + E: PgExecutor<'e>, +{ + sqlx::query_as::<_, CoreBlockJoinedRow>( + "\ +SELECT + a.attachment_id, + a.agent_id AS attachment_agent_id, + a.attached_by_agent_id, + a.attached_at, + b.block_id, + b.tenant_id, + b.project_id, + b.agent_id, + b.scope, + b.key, + b.title, + b.content, + b.source_ref, + b.status, + b.created_at, + b.updated_at +FROM core_memory_block_attachments a +JOIN core_memory_blocks b ON b.block_id = a.block_id +WHERE a.tenant_id = $1 + AND a.project_id = $2 + AND a.agent_id = $3 + AND a.read_profile = $4 + AND a.detached_at IS NULL + AND b.status = 'active' +ORDER BY a.attached_at ASC, b.key ASC", + ) + .bind(tenant_id) + .bind(project_id) + .bind(agent_id) + .bind(read_profile) + .fetch_all(executor) + .await + .map_err(Into::into) +} + +async fn fetch_audit_history<'e, E>( + executor: E, + block_ids: &[Uuid], +) -> Result>> +where + E: PgExecutor<'e>, +{ + if block_ids.is_empty() { + return Ok(HashMap::new()); + } + + let rows = sqlx::query_as::<_, CoreBlockEventRow>( + "\ +SELECT + event_id, + block_id, + attachment_id, + actor_agent_id, + event_type, + target_agent_id, + read_profile, + prev_snapshot, + new_snapshot, + reason, + ts +FROM core_memory_block_events +WHERE block_id = ANY($1) +ORDER BY ts ASC, event_id ASC", + ) + .bind(block_ids) + .fetch_all(executor) + .await?; + let mut by_block: HashMap> = HashMap::new(); + + for row in rows { + by_block.entry(row.block_id).or_default().push(CoreBlockAuditEvent { + event_id: row.event_id, + block_id: row.block_id, + attachment_id: row.attachment_id, + actor_agent_id: row.actor_agent_id, + event_type: row.event_type, + target_agent_id: row.target_agent_id, + read_profile: row.read_profile, + prev_snapshot: row.prev_snapshot, + new_snapshot: row.new_snapshot, + reason: row.reason, + ts: row.ts, + }); + } + + Ok(by_block) +} + +async fn insert_core_block_event( + tx: &mut Transaction<'_, Postgres>, + event: CoreBlockEventInput<'_>, +) -> Result<()> { + sqlx::query( + "\ +INSERT INTO core_memory_block_events ( + event_id, + block_id, + attachment_id, + tenant_id, + project_id, + actor_agent_id, + event_type, + target_agent_id, + read_profile, + prev_snapshot, + new_snapshot, + reason, + ts +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", + ) + .bind(Uuid::new_v4()) + .bind(event.block_id) + .bind(event.attachment_id) + .bind(event.tenant_id) + .bind(event.project_id) + .bind(event.actor_agent_id) + .bind(event.event_type) + .bind(event.target_agent_id) + .bind(event.read_profile) + .bind(event.prev_snapshot) + .bind(event.new_snapshot) + .bind(event.reason) + .bind(event.ts) + .execute(&mut **tx) + .await?; + + Ok(()) +} diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index e784e4b0..726e1e87 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -7,6 +7,7 @@ pub mod add_note; pub mod admin; pub mod admin_graph_predicates; pub mod consolidation; +pub mod core_blocks; pub mod delete; pub mod docs; pub mod graph; @@ -46,6 +47,12 @@ pub use self::{ ConsolidationRunCreateRequest, ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse, ConsolidationRunsListRequest, ConsolidationRunsListResponse, }, + core_blocks::{ + CoreBlockAttachRequest, CoreBlockAttachResponse, CoreBlockDetachRequest, + CoreBlockDetachResponse, CoreBlockItem, CoreBlockRecord, CoreBlockUpsertRequest, + CoreBlockUpsertResponse, CoreBlocksGetRequest, CoreBlocksResponse, + ELF_CORE_MEMORY_BLOCKS_SCHEMA_V1, + }, delete::{DeleteRequest, DeleteResponse}, docs::{ DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse, diff --git a/packages/elf-storage/src/schema.rs b/packages/elf-storage/src/schema.rs index e12d31a7..9bbafc56 100644 --- a/packages/elf-storage/src/schema.rs +++ b/packages/elf-storage/src/schema.rs @@ -94,6 +94,13 @@ fn expand_includes(sql: &str) -> String { "tables/038_knowledge_page_lint_findings.sql" => out.push_str(include_str!( "../../../sql/tables/038_knowledge_page_lint_findings.sql" )), + "tables/039_core_memory_blocks.sql" => + out.push_str(include_str!("../../../sql/tables/039_core_memory_blocks.sql")), + "tables/040_core_memory_block_attachments.sql" => out.push_str(include_str!( + "../../../sql/tables/040_core_memory_block_attachments.sql" + )), + "tables/041_core_memory_block_events.sql" => out + .push_str(include_str!("../../../sql/tables/041_core_memory_block_events.sql")), "tables/023_memory_ingest_decisions.sql" => out .push_str(include_str!("../../../sql/tables/023_memory_ingest_decisions.sql")), "tables/024_memory_space_grants.sql" => @@ -126,5 +133,8 @@ mod tests { assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_sections")); assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_source_refs")); assert!(schema.contains("CREATE TABLE IF NOT EXISTS knowledge_page_lint_findings")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS core_memory_blocks")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS core_memory_block_attachments")); + assert!(schema.contains("CREATE TABLE IF NOT EXISTS core_memory_block_events")); } } diff --git a/sql/init.sql b/sql/init.sql index 98b2ee45..99641a31 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -37,3 +37,6 @@ \ir tables/036_knowledge_page_sections.sql \ir tables/037_knowledge_page_source_refs.sql \ir tables/038_knowledge_page_lint_findings.sql +\ir tables/039_core_memory_blocks.sql +\ir tables/040_core_memory_block_attachments.sql +\ir tables/041_core_memory_block_events.sql diff --git a/sql/tables/039_core_memory_blocks.sql b/sql/tables/039_core_memory_blocks.sql new file mode 100644 index 00000000..76ad8604 --- /dev/null +++ b/sql/tables/039_core_memory_blocks.sql @@ -0,0 +1,27 @@ +CREATE TABLE IF NOT EXISTS core_memory_blocks ( + block_id uuid PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + scope text NOT NULL, + key text NOT NULL, + title text NOT NULL, + content text NOT NULL, + source_ref jsonb NOT NULL, + status text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + CONSTRAINT ck_core_memory_blocks_scope + CHECK (scope IN ('agent_private', 'project_shared', 'org_shared')), + CONSTRAINT ck_core_memory_blocks_status + CHECK (status IN ('active', 'archived')), + CONSTRAINT ck_core_memory_blocks_source_ref_object + CHECK (jsonb_typeof(source_ref) = 'object') +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_core_memory_blocks_active_key + ON core_memory_blocks (tenant_id, project_id, agent_id, scope, key) + WHERE status = 'active'; + +CREATE INDEX IF NOT EXISTS idx_core_memory_blocks_scope_status + ON core_memory_blocks (tenant_id, project_id, scope, status); diff --git a/sql/tables/040_core_memory_block_attachments.sql b/sql/tables/040_core_memory_block_attachments.sql new file mode 100644 index 00000000..55fc0229 --- /dev/null +++ b/sql/tables/040_core_memory_block_attachments.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS core_memory_block_attachments ( + attachment_id uuid PRIMARY KEY, + block_id uuid NOT NULL REFERENCES core_memory_blocks(block_id) ON DELETE CASCADE, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + read_profile text NOT NULL, + attached_by_agent_id text NOT NULL, + attached_at timestamptz NOT NULL DEFAULT now(), + detached_by_agent_id text NULL, + detached_at timestamptz NULL, + CONSTRAINT ck_core_memory_block_attachments_read_profile + CHECK (read_profile IN ('private_only', 'private_plus_project', 'all_scopes')) +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_core_memory_block_attachments_active + ON core_memory_block_attachments (tenant_id, project_id, agent_id, read_profile, block_id) + WHERE detached_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_core_memory_block_attachments_read + ON core_memory_block_attachments (tenant_id, project_id, agent_id, read_profile, detached_at); + +CREATE INDEX IF NOT EXISTS idx_core_memory_block_attachments_block + ON core_memory_block_attachments (block_id, detached_at); diff --git a/sql/tables/041_core_memory_block_events.sql b/sql/tables/041_core_memory_block_events.sql new file mode 100644 index 00000000..b6033847 --- /dev/null +++ b/sql/tables/041_core_memory_block_events.sql @@ -0,0 +1,30 @@ +CREATE TABLE IF NOT EXISTS core_memory_block_events ( + event_id uuid PRIMARY KEY, + block_id uuid NOT NULL REFERENCES core_memory_blocks(block_id) ON DELETE CASCADE, + attachment_id uuid NULL REFERENCES core_memory_block_attachments(attachment_id) ON DELETE SET NULL, + tenant_id text NOT NULL, + project_id text NOT NULL, + actor_agent_id text NOT NULL, + event_type text NOT NULL, + target_agent_id text NULL, + read_profile text NULL, + prev_snapshot jsonb NULL, + new_snapshot jsonb NULL, + reason text NOT NULL, + ts timestamptz NOT NULL DEFAULT now(), + CONSTRAINT ck_core_memory_block_events_event_type + CHECK ( + event_type IN ( + 'block_created', + 'block_updated', + 'attachment_added', + 'attachment_removed' + ) + ) +); + +CREATE INDEX IF NOT EXISTS idx_core_memory_block_events_block_ts + ON core_memory_block_events (block_id, ts); + +CREATE INDEX IF NOT EXISTS idx_core_memory_block_events_attachment_ts + ON core_memory_block_events (attachment_id, ts);