diff --git a/apps/elf-api/src/routes.rs b/apps/elf-api/src/routes.rs index aea1804..a3aa28b 100644 --- a/apps/elf-api/src/routes.rs +++ b/apps/elf-api/src/routes.rs @@ -30,7 +30,7 @@ use elf_domain::{ ConsolidationReviewState, }, english_gate, - knowledge::KnowledgePageKind, + knowledge::{KnowledgePageKind, KnowledgeSourceKind}, writegate::WritePolicy, }; use elf_service::{ @@ -55,10 +55,11 @@ use elf_service::{ DocsSearchL0Response, DreamingReviewQueueRequest, DreamingReviewQueueResponse, EntityMemoryViewRequest, EntityMemoryViewResponse, Error, EventMessage, GranteeKind, GraphQueryEntityRef, GraphQueryPredicateRef, GraphQueryRequest, GraphQueryResponse, - GraphReportRequest, GraphReportResponse, IngestionProfileSelector, KnowledgePageGetRequest, - KnowledgePageLintRequest, KnowledgePageLintResponse, KnowledgePageRebuildRequest, - KnowledgePageRebuildResponse, KnowledgePageResponse, KnowledgePageSearchRequest, - KnowledgePageSearchResponse, KnowledgePagesListRequest, KnowledgePagesListResponse, + GraphReportRequest, GraphReportResponse, IngestionProfileSelector, KnowledgePageChangedSource, + KnowledgePageGetRequest, KnowledgePageLintRequest, KnowledgePageLintResponse, + KnowledgePageRebuildRequest, KnowledgePageRebuildResponse, KnowledgePageResponse, + KnowledgePageSearchRequest, KnowledgePageSearchResponse, KnowledgePageWatchRebuildRequest, + KnowledgePageWatchRebuildResponse, KnowledgePagesListRequest, KnowledgePagesListResponse, ListRequest, ListResponse, MemoryCorrectionAction, MemoryCorrectionRequest, MemoryCorrectionResponse, MemoryHistoryGetRequest, MemoryHistoryResponse, NoteFetchRequest, NoteFetchResponse, NoteProvenanceBundleResponse, NoteProvenanceGetRequest, PayloadLevel, @@ -151,6 +152,7 @@ const VIEWER_HTML: &str = include_str!("../static/viewer.html"); dreaming_review_queue, recall_debug_panel, knowledge_page_rebuild, + knowledge_pages_watch_rebuild, knowledge_pages_list, knowledge_pages_search, knowledge_page_get, @@ -446,6 +448,20 @@ struct KnowledgePageRebuildBody { provider_metadata: Value, } +#[derive(Clone, Debug, Deserialize)] +struct KnowledgePageChangedSourceBody { + source_kind: KnowledgeSourceKind, + source_id: Uuid, +} + +#[derive(Clone, Debug, Deserialize)] +struct KnowledgePageWatchRebuildBody { + changed_sources: Vec, + page_kind: Option, + limit: Option, + generate_memory_candidates: Option, +} + #[derive(Clone, Debug, Deserialize)] struct KnowledgePagesListQuery { page_kind: Option, @@ -785,6 +801,10 @@ pub fn admin_router(state: AppState) -> Router { .route("/v2/admin/recall-debug/panel", routing::post(admin_recall_debug_panel)) .route("/v2/admin/knowledge/pages", routing::get(knowledge_pages_list)) .route("/v2/admin/knowledge/pages/rebuild", routing::post(knowledge_page_rebuild)) + .route( + "/v2/admin/knowledge/pages/rebuild-changed-sources", + routing::post(knowledge_pages_watch_rebuild), + ) .route("/v2/admin/knowledge/pages/search", routing::post(knowledge_pages_search)) .route("/v2/admin/knowledge/pages/{page_id}", routing::get(knowledge_page_get)) .route("/v2/admin/knowledge/pages/{page_id}/lint", routing::post(knowledge_page_lint)) @@ -3303,6 +3323,54 @@ async fn knowledge_page_rebuild( Ok(Json(response)) } +#[utoipa::path( + post, + path = "/v2/admin/knowledge/pages/rebuild-changed-sources", + tag = "knowledge", + request_body = Value, + responses( + (status = 200, description = "Affected knowledge pages were rebuilt.", body = Value), + (status = 400, description = "Invalid request.", body = ErrorBody), + (status = 401, description = "Authentication required.", body = ErrorBody), + (status = 403, description = "Admin access required.", body = ErrorBody), + (status = 500, description = "Internal error.", body = ErrorBody), + ) +)] +async fn knowledge_pages_watch_rebuild( + State(state): State, + headers: HeaderMap, + payload: Result, JsonRejection>, +) -> Result, ApiError> { + let ctx = RequestContext::from_headers(&headers)?; + let Json(payload) = payload.map_err(|err| { + tracing::warn!(error = %err, "Invalid request payload."); + + json_error(StatusCode::BAD_REQUEST, "INVALID_REQUEST", "Invalid request payload.", None) + })?; + let changed_sources = payload + .changed_sources + .into_iter() + .map(|source| KnowledgePageChangedSource { + source_kind: source.source_kind, + source_id: source.source_id, + }) + .collect(); + let response = state + .service + .knowledge_pages_watch_rebuild(KnowledgePageWatchRebuildRequest { + tenant_id: ctx.tenant_id, + project_id: ctx.project_id, + agent_id: ctx.agent_id, + changed_sources, + page_kind: payload.page_kind, + limit: payload.limit, + generate_memory_candidates: payload.generate_memory_candidates.unwrap_or(true), + }) + .await?; + + Ok(Json(response)) +} + #[utoipa::path( get, path = "/v2/admin/knowledge/pages", diff --git a/docs/evidence/2026-06-22-knowledge-watch-rebuild-drift-audit.md b/docs/evidence/2026-06-22-knowledge-watch-rebuild-drift-audit.md new file mode 100644 index 0000000..0b9a6b8 --- /dev/null +++ b/docs/evidence/2026-06-22-knowledge-watch-rebuild-drift-audit.md @@ -0,0 +1,89 @@ +--- +type: Drift Audit +title: "Knowledge Watch/Rebuild Drift Audit" +description: "Drift audit for the changed-source Knowledge Workspace rebuild and reviewable memory-candidate contract." +resource: docs/evidence/2026-06-22-knowledge-watch-rebuild-drift-audit.md +status: active +authority: current_state +owner: docs +last_verified: 2026-06-22 +tags: + - docs + - drift-audit + - knowledge-workspace +source_refs: [] +code_refs: + - apps/elf-api/src/routes.rs + - packages/elf-domain/src/knowledge.rs + - packages/elf-service/src/knowledge.rs + - packages/elf-storage/src/knowledge.rs + - docs/spec/system_knowledge_pages_v1.md + - docs/spec/system_elf_memory_service_v2.md +related: + - docs/spec/system_knowledge_pages_v1.md + - docs/spec/system_consolidation_proposals_v1.md +drift_watch: + - apps/elf-api/src/routes.rs + - packages/elf-domain/src/knowledge.rs + - packages/elf-service/src/knowledge.rs + - packages/elf-storage/src/knowledge.rs + - docs/spec/system_knowledge_pages_v1.md + - docs/spec/system_elf_memory_service_v2.md +--- +# Knowledge Watch/Rebuild Drift Audit + +Purpose: Anchor the changed-source Knowledge Workspace rebuild contract to the +current service and API surfaces. +Read this when: You need evidence boundaries for the knowledge watch/rebuild API, +section-state output, or reviewable memory-candidate proposal path. +Not this document: Benchmark interpretation, external product comparison, or +operator setup procedure. + +## Watched Claims + +- `POST /v2/admin/knowledge/pages/rebuild-changed-sources` is the admin entrypoint + for changed-source rebuilds. +- Changed-source rebuilds select only pages that already cite supplied source refs. +- Rebuild output reports changed, unchanged, stale, and blocked page/section states. +- Rebuild output emits stale-section, changed-claim, missing-citation, and conflict + classifications. +- Memory candidates from knowledge deltas are queued through consolidation proposals + and do not mutate source records or memory notes directly. + +## Evidence Anchors + +- `packages/elf-storage/src/knowledge.rs` owns affected-page lookup by source ref. +- `packages/elf-service/src/knowledge.rs` owns changed-source lint, rebuild, + section-state output, and memory-candidate proposal payload construction. +- `apps/elf-api/src/routes.rs` exposes the admin route. +- `docs/spec/system_knowledge_pages_v1.md` owns the normative contract. +- `docs/spec/system_consolidation_proposals_v1.md` owns the reviewable memory + promotion path. + +## Reverse Checks + +- Run `cargo make check-rust` after code changes to verify the route/service/storage + surface compiles. +- Run `cargo make check-docs` after docs changes to verify links and task references. +- Run the focused knowledge service tests before claiming the source-update and stale + page cases are covered. + +## Verdict + +pass + +## Required Updates + +- Keep `docs/spec/system_knowledge_pages_v1.md` aligned with any future changes to + watch/rebuild response fields, output classifications, or proposal routing. +- The repository-native docs gate passed for this lane. The stricter Decodex docs + profile still reports unrelated P1 closeout report metadata shape issues outside + this lane. + +## Citations + +- `apps/elf-api/src/routes.rs` +- `packages/elf-service/src/knowledge.rs` +- `packages/elf-storage/src/knowledge.rs` +- `docs/spec/system_knowledge_pages_v1.md` +- `docs/spec/system_consolidation_proposals_v1.md` diff --git a/docs/evidence/index.md b/docs/evidence/index.md index c726739..4cdac5d 100644 --- a/docs/evidence/index.md +++ b/docs/evidence/index.md @@ -17,5 +17,8 @@ Routes to: Drift audits and evidence concepts under `docs/evidence/`. migration. - `2026-06-18-research-artifact-disposition.md`: Evidence record for promoted, carried-forward, moved, and deleted legacy research JSON artifacts. +- `2026-06-22-knowledge-watch-rebuild-drift-audit.md`: Drift audit for the + changed-source Knowledge Workspace rebuild and reviewable memory-candidate + proposal contract. - `external_memory_pattern_radar_latest.md`: Latest weekly external memory pattern radar summary. diff --git a/docs/log.md b/docs/log.md index f58e258..fdae139 100644 --- a/docs/log.md +++ b/docs/log.md @@ -87,3 +87,7 @@ logs. `cargo make real-world-memory-p1-closeout`, preserving the Source Library -> Memory Candidate -> approved memory -> recall/debug -> correction/rollback authority chain and keeping P2 queueing conditional on main-thread acceptance. +- Added the Knowledge Workspace changed-source watch/rebuild contract for XY-1065, + plus a drift audit covering the new admin rebuild endpoint, changed/unchanged/ + stale/blocked section output, stale-section/changed-claim/missing-citation/conflict + classifications, and reviewable memory-candidate proposal routing. diff --git a/docs/spec/system_elf_memory_service_v2.md b/docs/spec/system_elf_memory_service_v2.md index ad04760..3c325d2 100644 --- a/docs/spec/system_elf_memory_service_v2.md +++ b/docs/spec/system_elf_memory_service_v2.md @@ -1182,6 +1182,7 @@ Behavior: Admin derived knowledge pages: - POST /v2/admin/knowledge/pages/rebuild +- POST /v2/admin/knowledge/pages/rebuild-changed-sources - GET /v2/admin/knowledge/pages - POST /v2/admin/knowledge/pages/search - GET /v2/admin/knowledge/pages/{page_id} @@ -1192,6 +1193,11 @@ Behavior: lint for derived knowledge pages. The search endpoint exposes derived page section snippets with visible citations, source coverage, lint summary, trust state, and repair/rebuild guidance. +- The changed-source rebuild endpoint accepts changed source refs, finds only pages + already citing those refs, lints before rebuilding, returns changed/unchanged/stale/ + blocked page and section states, emits stale-section, changed-claim, + missing-citation, and conflict outputs, and may queue reviewable memory candidates + through consolidation proposals. - Page payloads must follow `elf.knowledge_page/v1`, preserve section citations, and write normalized source refs for lint. - Pages are derived and rebuildable; rebuilding or linting a page must not mutate diff --git a/docs/spec/system_knowledge_pages_v1.md b/docs/spec/system_knowledge_pages_v1.md index 022c6dc..a128cbf 100644 --- a/docs/spec/system_knowledge_pages_v1.md +++ b/docs/spec/system_knowledge_pages_v1.md @@ -12,6 +12,7 @@ tags: - spec source_refs: [] code_refs: + - apps/elf-api/src/routes.rs - packages/elf-domain/src/knowledge.rs - packages/elf-service/src/knowledge.rs - packages/elf-storage/src/knowledge.rs @@ -20,6 +21,7 @@ code_refs: related: [] drift_watch: - docs/spec/system_knowledge_pages_v1.md + - apps/elf-api/src/routes.rs - packages/elf-domain/src/knowledge.rs - packages/elf-service/src/knowledge.rs - packages/elf-storage/src/knowledge.rs @@ -150,6 +152,42 @@ When future provider-backed or LLM-derived page text is persisted, `rebuild_metadata.deterministic` must be false unless the provider output is fully replayable from recorded metadata. +## Changed-Source Watch/Rebuild Contract + +The changed-source watch/rebuild path exposes the deterministic operational loop for +source changes. Its response schema is `elf.knowledge_page.watch_rebuild/v1`. + +Input: + +- `changed_sources`: non-empty list of source refs with `source_kind` and `source_id`. +- `page_kind`: optional page-kind filter. +- `limit`: optional affected-page limit. +- `generate_memory_candidates`: optional boolean, default `true`. + +Behavior: + +- The service must look up only knowledge pages that already cite one of the supplied + changed source refs. It must not rebuild unrelated pages. +- For each affected page, the service must lint the currently stored page first, then + rebuild from that page's stored normalized source refs and current authoritative + source rows. +- A page that cannot resolve all stored sources or cannot rebuild must be returned as + `blocked` with an operator-readable reason. Other page states are `changed`, + `unchanged`, or `stale`. +- Per-section output must classify `changed`, `unchanged`, `stale`, or `blocked` + sections. Classified outputs must include: + - `stale_section` for stale or missing stored source snapshots. + - `changed_claim` for sections whose derived content changed after rebuild. + - `missing_citation` for citation or normalized backlink gaps. + - `conflict` when a stale stored section also changes after current-source rebuild. +- Responses must include operator-readable summary lines with affected, changed, + unchanged, stale, blocked, and memory-candidate counts. + +The watch/rebuild path is a derived-artifact operation. It may update +`knowledge_pages`, `knowledge_page_sections`, `knowledge_page_source_refs`, and +`knowledge_page_lint_findings` for affected pages only. It must not mutate +authoritative source notes, docs, events, graph facts, traces, or source pointers. + ## Lint Contract The v1 lint path compares stored normalized source refs with current source rows. @@ -193,6 +231,13 @@ When a page section becomes candidate memory, the candidate must be represented proposal follows the Memory Promotion Apply Contract in `system_consolidation_proposals_v1.md`. +Changed-source watch/rebuild may generate `MemoryCandidate` proposal payloads from +`changed_claim` or `conflict` knowledge deltas. These candidates must carry source +refs, source snapshots, a reason, a reviewable diff, and a proposed memory payload. +The service must route them through a queued consolidation run on the +`consolidation_proposals` review surface; it must not directly write the memory +ledger. + ## Search and Viewer Readback Knowledge page search is a derived-artifact readback surface, not the authoritative @@ -223,6 +268,7 @@ authoritative memory notes. Minimal admin readback endpoints: - `POST /v2/admin/knowledge/pages/rebuild` +- `POST /v2/admin/knowledge/pages/rebuild-changed-sources` - `GET /v2/admin/knowledge/pages` - `POST /v2/admin/knowledge/pages/search` - `GET /v2/admin/knowledge/pages/{page_id}` diff --git a/packages/elf-domain/src/knowledge.rs b/packages/elf-domain/src/knowledge.rs index 5ec3cc0..9a9b554 100644 --- a/packages/elf-domain/src/knowledge.rs +++ b/packages/elf-domain/src/knowledge.rs @@ -10,6 +10,8 @@ pub const KNOWLEDGE_PAGE_REBUILD_SCHEMA_V1: &str = "elf.knowledge_page.rebuild/v pub const KNOWLEDGE_PAGE_SOURCE_COVERAGE_SCHEMA_V1: &str = "elf.knowledge_page.source_coverage/v1"; /// Current previous-version diff metadata schema identifier. pub const KNOWLEDGE_PAGE_VERSION_DIFF_SCHEMA_V1: &str = "elf.knowledge_page.version_diff/v1"; +/// Current changed-source watch/rebuild response schema identifier. +pub const KNOWLEDGE_PAGE_WATCH_REBUILD_SCHEMA_V1: &str = "elf.knowledge_page.watch_rebuild/v1"; /// Derived knowledge page category. #[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] diff --git a/packages/elf-service/src/knowledge.rs b/packages/elf-service/src/knowledge.rs index 93071e4..aafc7f7 100644 --- a/packages/elf-service/src/knowledge.rs +++ b/packages/elf-service/src/knowledge.rs @@ -8,13 +8,23 @@ use sqlx::{Postgres, Transaction}; use time::OffsetDateTime; use uuid::Uuid; -use crate::{ElfService, Error, Result}; +use crate::{ + ElfService, Error, Result, + consolidation::{ + ConsolidationProposalInput, ConsolidationRunCreateRequest, ConsolidationRunCreateResponse, + }, +}; use elf_domain::{ + consolidation::{ + ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarker, + ConsolidationMarkerSeverity, ConsolidationMarkers, ConsolidationProposalDiff, + ConsolidationSourceKind, ConsolidationSourceSnapshot, + }, english_gate, knowledge::{ KNOWLEDGE_PAGE_CONTRACT_SCHEMA_V1, KNOWLEDGE_PAGE_REBUILD_SCHEMA_V1, KNOWLEDGE_PAGE_SOURCE_COVERAGE_SCHEMA_V1, KNOWLEDGE_PAGE_VERSION_DIFF_SCHEMA_V1, - KnowledgePageKind, KnowledgeSourceKind, + KNOWLEDGE_PAGE_WATCH_REBUILD_SCHEMA_V1, KnowledgePageKind, KnowledgeSourceKind, }, }; use elf_storage::{ @@ -136,6 +146,35 @@ pub struct KnowledgePageSearchRequest { pub limit: Option, } +/// Request to rebuild pages affected by changed authoritative sources. +#[derive(Clone, Debug, Deserialize)] +pub struct KnowledgePageWatchRebuildRequest { + /// Tenant that owns the pages and changed sources. + pub tenant_id: String, + /// Project that owns the pages and changed sources. + pub project_id: String, + /// Agent requesting the watch/rebuild operation. + pub agent_id: String, + /// Changed source references observed by a watcher or operator. + pub changed_sources: Vec, + /// Optional page-kind filter for the affected-page lookup. + pub page_kind: Option, + /// Maximum number of affected pages to rebuild. + pub limit: Option, + #[serde(default = "default_generate_memory_candidates")] + /// Whether changed knowledge deltas should queue reviewable memory proposals. + pub generate_memory_candidates: bool, +} + +/// Changed authoritative source reference for the watch/rebuild loop. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct KnowledgePageChangedSource { + /// Changed source kind. + pub source_kind: KnowledgeSourceKind, + /// Changed source identifier. + pub source_id: Uuid, +} + /// Response returned after linting one knowledge page. #[derive(Clone, Debug, Serialize)] pub struct KnowledgePageLintResponse { @@ -152,6 +191,138 @@ pub struct KnowledgePageSearchResponse { pub items: Vec, } +/// Response returned after rebuilding pages affected by changed sources. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageWatchRebuildResponse { + /// Versioned response schema. + pub schema: String, + /// Operator-readable aggregate summary. + pub summary: KnowledgePageWatchRebuildSummary, + /// Per-page rebuild results. + pub pages: Vec, + /// Reviewable memory candidates derived from knowledge deltas. + pub memory_candidates: Vec, + /// Queued consolidation run, when memory candidates were generated. + pub proposal_run: Option, + /// One-line operator summary messages. + pub operator_summary: Vec, +} + +/// Aggregate watch/rebuild outcome counters. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageWatchRebuildSummary { + /// Changed source count after de-duplication. + pub changed_source_count: usize, + /// Knowledge pages that cited one of the changed sources. + pub affected_page_count: usize, + /// Pages rebuilt with changed derived output. + pub changed_page_count: usize, + /// Pages rebuilt with unchanged derived output. + pub unchanged_page_count: usize, + /// Pages that had stale lint findings before rebuild. + pub stale_page_count: usize, + /// Pages that could not be rebuilt. + pub blocked_page_count: usize, + /// Memory candidates generated for review. + pub memory_candidate_count: usize, +} + +/// Per-page changed-source rebuild result. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageWatchRebuildItem { + /// Knowledge page identifier. + pub page_id: Uuid, + /// Page kind. + pub page_kind: String, + /// Stable page key. + pub page_key: String, + /// Page title. + pub title: String, + /// Page rebuild state: changed, unchanged, stale, or blocked. + pub rebuild_state: String, + /// Per-section rebuild states. + pub sections: Vec, + /// Classified rebuild/lint outputs. + pub outputs: Vec, + /// Rebuilt page readback, omitted when blocked. + pub rebuilt_page: Option, + /// Blocking error text, when rebuild failed. + pub blocked_reason: Option, + /// Previous-version diff metadata, when available. + pub previous_version_diff: Option, + /// Operator-readable page summary. + pub operator_summary: String, +} + +/// Per-section rebuild state for changed-source rebuild output. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageSectionRebuildState { + /// Stable section key. + pub section_key: String, + /// Section heading. + pub heading: String, + /// Section state: changed, unchanged, stale, or blocked. + pub state: String, + /// Output types attached to the section. + pub output_types: Vec, + /// Lint finding types attached to the section before rebuild. + pub lint_finding_types: Vec, +} + +/// Classified output emitted by the watch/rebuild loop. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageRebuildOutput { + /// Output type, such as stale_section, changed_claim, missing_citation, conflict, + /// changed_source, or blocked. + pub output_type: String, + /// Severity for operator triage. + pub severity: String, + /// Associated section key, when section-scoped. + pub section_key: Option, + /// Associated source kind, when source-scoped. + pub source_kind: Option, + /// Associated source id, when source-scoped. + pub source_id: Option, + /// Human-readable output message. + pub message: String, + /// Structured reason and evidence details. + pub details: Value, +} + +/// Reviewable memory candidate produced from a knowledge delta. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgeDeltaMemoryCandidate { + /// Candidate reason, such as changed_claim or conflict. + pub reason: String, + /// Knowledge page identifier. + pub page_id: Uuid, + /// Section identifier that produced the candidate. + pub section_id: Uuid, + /// Stable section key. + pub section_key: String, + /// Source refs copied into the reviewable proposal. + pub source_refs: Vec, + /// Source snapshot summary for reviewer inspection. + pub source_snapshot: Value, + /// Reviewable proposal diff. + pub diff: ConsolidationProposalDiff, + /// Proposed memory note payload. + pub proposed_payload: Value, +} + +/// Queued reviewable proposal run produced by changed-source rebuild. +#[derive(Clone, Debug, Serialize)] +pub struct KnowledgePageProposalRunSummary { + /// Consolidation run identifier. + pub run_id: Uuid, + /// Queued worker job identifier. + pub job_id: Uuid, + /// Number of memory candidate proposals queued in the run payload. + pub proposal_count: usize, + /// Review surface for the queued candidates. + pub review_surface: String, +} + /// Summary DTO for one derived knowledge page. #[derive(Clone, Debug, Serialize)] pub struct KnowledgePageSummary { @@ -608,6 +779,10 @@ impl SourceIds { } } +struct WatchRebuildOutcome { + item: KnowledgePageWatchRebuildItem, + candidates: Vec, +} impl ElfService { /// Rebuilds and persists one derived knowledge page from explicit source ids. pub async fn knowledge_page_rebuild( @@ -701,6 +876,55 @@ impl ElfService { Ok(KnowledgePageRebuildResponse { page: self.knowledge_page_response(page).await? }) } + /// Rebuilds pages affected by changed source refs and queues reviewable candidates. + pub async fn knowledge_pages_watch_rebuild( + &self, + req: KnowledgePageWatchRebuildRequest, + ) -> Result { + validate_context(req.tenant_id.as_str(), req.project_id.as_str(), req.agent_id.as_str())?; + + let changed_sources = normalized_changed_sources(&req.changed_sources)?; + let (source_kinds, source_ids) = changed_source_arrays(&changed_sources); + let page_kind = req.page_kind.map(KnowledgePageKind::as_str); + let pages = knowledge::list_knowledge_pages_for_sources( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + page_kind, + &source_kinds, + &source_ids, + bounded_limit(req.limit), + ) + .await?; + let mut items = Vec::new(); + let mut candidates = Vec::new(); + + for page in pages { + let outcome = + self.watch_rebuild_page(req.agent_id.as_str(), page, &changed_sources).await?; + + candidates.extend(outcome.candidates); + items.push(outcome.item); + } + + let proposal_run = if req.generate_memory_candidates && !candidates.is_empty() { + Some(self.queue_knowledge_delta_candidates(&req, &changed_sources, &candidates).await?) + } else { + None + }; + let summary = watch_rebuild_summary(changed_sources.len(), &items, candidates.len()); + let operator_summary = watch_operator_summary(&summary, proposal_run.as_ref()); + + Ok(KnowledgePageWatchRebuildResponse { + schema: KNOWLEDGE_PAGE_WATCH_REBUILD_SCHEMA_V1.to_string(), + summary, + pages: items, + memory_candidates: candidates, + proposal_run, + operator_summary, + }) + } + /// Gets one derived knowledge page with sections, source refs, and lint findings. pub async fn knowledge_page_get( &self, @@ -970,6 +1194,895 @@ impl ElfService { Ok(sources.drain(..).map(|source| (source_key(&source), source)).collect()) } + + async fn watch_rebuild_page( + &self, + agent_id: &str, + page: KnowledgePage, + changed_sources: &[KnowledgePageChangedSource], + ) -> Result { + let source_refs = + knowledge::list_knowledge_page_source_refs(&self.db.pool, page.page_id).await?; + let sections = knowledge::list_knowledge_page_sections(&self.db.pool, page.page_id).await?; + let before_lint = self.watch_rebuild_lint(&page, §ions, &source_refs).await?; + let request = rebuild_request_from_page(agent_id, &page, &source_refs); + let rebuild = match request { + Ok(request) => self.knowledge_page_rebuild(request).await, + Err(err) => Err(err), + }; + + match rebuild { + Ok(response) => Ok(successful_watch_rebuild( + sections, + source_refs, + before_lint, + response.page, + changed_sources, + )), + Err(err) => Ok(blocked_watch_rebuild(page, sections, before_lint, err)), + } + } + + async fn watch_rebuild_lint( + &self, + page: &KnowledgePage, + sections: &[KnowledgePageSection], + source_refs: &[KnowledgePageSourceRef], + ) -> Result> { + let mut lint = self.lint_source_refs(page, source_refs).await?; + + lint.extend(lint_page_sections(page, sections, source_refs)); + + Ok(lint) + } + + async fn queue_knowledge_delta_candidates( + &self, + req: &KnowledgePageWatchRebuildRequest, + changed_sources: &[KnowledgePageChangedSource], + candidates: &[KnowledgeDeltaMemoryCandidate], + ) -> Result { + let source_refs = candidate_run_input_refs(candidates); + let source_snapshot = knowledge_delta_source_snapshot(changed_sources, candidates); + let lineage = ConsolidationLineage { + source_refs: source_refs.clone(), + parent_run_id: None, + parent_proposal_ids: Vec::new(), + }; + let proposals = candidates.iter().map(candidate_proposal_input).collect::>(); + let created = self + .consolidation_run_create(ConsolidationRunCreateRequest { + tenant_id: req.tenant_id.clone(), + project_id: req.project_id.clone(), + agent_id: req.agent_id.clone(), + job_kind: "manual".to_string(), + input_refs: source_refs, + source_snapshot, + lineage, + proposals, + }) + .await?; + + Ok(proposal_run_summary(created, candidates.len())) + } +} + +fn normalized_changed_sources( + changed_sources: &[KnowledgePageChangedSource], +) -> Result> { + if changed_sources.is_empty() { + return Err(Error::InvalidRequest { + message: "changed_sources must not be empty.".to_string(), + }); + } + + let mut seen = BTreeSet::new(); + let mut out = Vec::new(); + + for source in changed_sources { + if seen.insert((source.source_kind.as_str().to_string(), source.source_id)) { + out.push(source.clone()); + } + } + + Ok(out) +} + +fn changed_source_arrays( + changed_sources: &[KnowledgePageChangedSource], +) -> (Vec, Vec) { + changed_sources + .iter() + .map(|source| (source.source_kind.as_str().to_string(), source.source_id)) + .unzip() +} + +fn rebuild_request_from_page( + agent_id: &str, + page: &KnowledgePage, + source_refs: &[KnowledgePageSourceRef], +) -> Result { + let ids = SourceIds::from_source_refs(source_refs)?; + let page_kind = KnowledgePageKind::parse(page.page_kind.as_str()).ok_or_else(|| { + Error::InvalidRequest { message: "stored knowledge page kind is invalid".to_string() } + })?; + let provider_metadata = page + .rebuild_metadata + .get("provider_metadata") + .filter(|metadata| matches!(metadata, Value::Object(_))) + .cloned() + .unwrap_or_else(empty_object); + + Ok(KnowledgePageRebuildRequest { + tenant_id: page.tenant_id.clone(), + project_id: page.project_id.clone(), + agent_id: agent_id.to_string(), + page_kind, + page_key: page.page_key.clone(), + title: Some(page.title.clone()), + doc_ids: ids.doc_ids, + doc_chunk_ids: ids.doc_chunk_ids, + note_ids: ids.note_ids, + event_ids: ids.event_ids, + relation_ids: ids.relation_ids, + proposal_ids: ids.proposal_ids, + provider_metadata, + }) +} + +fn successful_watch_rebuild( + before_sections: Vec, + before_source_refs: Vec, + before_lint: Vec, + rebuilt_page: KnowledgePageResponse, + changed_sources: &[KnowledgePageChangedSource], +) -> WatchRebuildOutcome { + let previous_version_diff = rebuilt_page.page.previous_version_diff.clone(); + let outputs = rebuild_outputs( + &before_sections, + &before_source_refs, + &before_lint, + previous_version_diff.as_ref(), + changed_sources, + ); + let sections = successful_section_states(&before_sections, &rebuilt_page.sections, &outputs); + let rebuild_state = successful_rebuild_state(previous_version_diff.as_ref(), &outputs); + let candidates = memory_candidates_for_page(&rebuilt_page, &outputs); + let operator_summary = page_operator_summary( + rebuilt_page.page.page_key.as_str(), + rebuild_state.as_str(), + outputs.len(), + candidates.len(), + ); + let item = KnowledgePageWatchRebuildItem { + page_id: rebuilt_page.page.page_id, + page_kind: rebuilt_page.page.page_kind.clone(), + page_key: rebuilt_page.page.page_key.clone(), + title: rebuilt_page.page.title.clone(), + rebuild_state, + sections, + outputs, + rebuilt_page: Some(rebuilt_page), + blocked_reason: None, + previous_version_diff, + operator_summary, + }; + + WatchRebuildOutcome { item, candidates } +} + +fn blocked_watch_rebuild( + page: KnowledgePage, + sections: Vec, + before_lint: Vec, + err: Error, +) -> WatchRebuildOutcome { + let outputs = blocked_outputs(§ions, &before_lint, err.to_string().as_str()); + let section_states = blocked_section_states(§ions, &outputs); + let operator_summary = + page_operator_summary(page.page_key.as_str(), "blocked", outputs.len(), 0); + let item = KnowledgePageWatchRebuildItem { + page_id: page.page_id, + page_kind: page.page_kind, + page_key: page.page_key, + title: page.title, + rebuild_state: "blocked".to_string(), + sections: section_states, + outputs, + rebuilt_page: None, + blocked_reason: Some(err.to_string()), + previous_version_diff: previous_version_diff_from_metadata(&page.rebuild_metadata), + operator_summary, + }; + + WatchRebuildOutcome { item, candidates: Vec::new() } +} + +fn rebuild_outputs( + sections: &[KnowledgePageSection], + source_refs: &[KnowledgePageSourceRef], + lint: &[LintDraft], + diff: Option<&Value>, + changed_sources: &[KnowledgePageChangedSource], +) -> Vec { + let section_index = section_lookup(sections); + let changed_keys = diff_section_keys(diff, "changed_section_keys"); + let mut outputs = lint_outputs(lint, §ion_index); + + outputs.extend(changed_claim_outputs(sections, &changed_keys)); + outputs.extend(conflict_outputs(&outputs)); + outputs.extend(changed_source_outputs(source_refs, changed_sources)); + + outputs +} + +fn blocked_outputs( + sections: &[KnowledgePageSection], + lint: &[LintDraft], + blocked_reason: &str, +) -> Vec { + let section_index = section_lookup(sections); + let mut outputs = lint_outputs(lint, §ion_index); + + outputs.push(KnowledgePageRebuildOutput { + output_type: "blocked".to_string(), + severity: "error".to_string(), + section_key: None, + source_kind: None, + source_id: None, + message: "Knowledge page could not be rebuilt from its stored source refs.".to_string(), + details: serde_json::json!({ "blocked_reason": blocked_reason }), + }); + + outputs +} + +fn lint_outputs( + lint: &[LintDraft], + section_index: &BTreeMap, +) -> Vec { + lint.iter().filter_map(|finding| lint_output(finding, section_index)).collect() +} + +fn lint_output( + finding: &LintDraft, + section_index: &BTreeMap, +) -> Option { + let output_type = match finding.finding_type.as_str() { + "stale_source_ref" => "stale_section", + "missing_citation" | "missing_source_ref" => "missing_citation", + _ => return None, + }; + let (section_key, heading) = finding + .section_id + .and_then(|section_id| section_index.get(§ion_id)) + .cloned() + .unwrap_or_else(|| ("page".to_string(), "Page".to_string())); + + Some(KnowledgePageRebuildOutput { + output_type: output_type.to_string(), + severity: finding.severity.clone(), + section_key: Some(section_key.clone()), + source_kind: finding.source_kind.map(KnowledgeSourceKind::as_str).map(ToString::to_string), + source_id: finding.source_id, + message: lint_output_message(output_type, heading.as_str()), + details: serde_json::json!({ + "finding_type": finding.finding_type, + "section_key": section_key, + "lint_details": finding.details, + }), + }) +} + +fn changed_claim_outputs( + sections: &[KnowledgePageSection], + changed_keys: &BTreeSet, +) -> Vec { + sections + .iter() + .filter(|section| changed_keys.contains(section.section_key.as_str())) + .map(|section| KnowledgePageRebuildOutput { + output_type: "changed_claim".to_string(), + severity: "info".to_string(), + section_key: Some(section.section_key.clone()), + source_kind: None, + source_id: None, + message: format!( + "Knowledge page section '{}' changed after rebuilding from current sources.", + section.heading + ), + details: serde_json::json!({ + "section_key": section.section_key, + "section_hash": section.content_hash, + }), + }) + .collect() +} + +fn changed_source_outputs( + source_refs: &[KnowledgePageSourceRef], + changed_sources: &[KnowledgePageChangedSource], +) -> Vec { + let changed = changed_source_set(changed_sources); + + source_refs + .iter() + .filter(|source_ref| { + changed.contains(&(source_ref.source_kind.clone(), source_ref.source_id)) + }) + .map(|source_ref| KnowledgePageRebuildOutput { + output_type: "changed_source".to_string(), + severity: "info".to_string(), + section_key: None, + source_kind: Some(source_ref.source_kind.clone()), + source_id: Some(source_ref.source_id), + message: "Changed source is attached to this knowledge page.".to_string(), + details: serde_json::json!({ + "source_kind": source_ref.source_kind, + "source_id": source_ref.source_id, + "section_id": source_ref.section_id, + }), + }) + .collect() +} + +fn conflict_outputs(outputs: &[KnowledgePageRebuildOutput]) -> Vec { + let stale = output_section_keys(outputs, "stale_section"); + let changed = output_section_keys(outputs, "changed_claim"); + + stale + .intersection(&changed) + .map(|section_key| { + KnowledgePageRebuildOutput { + output_type: "conflict".to_string(), + severity: "warning".to_string(), + section_key: Some(section_key.clone()), + source_kind: None, + source_id: None, + message: + "Stored derived section was stale and changed after rebuilding from current sources." + .to_string(), + details: serde_json::json!({ + "section_key": section_key, + "reason": "stale_snapshot_changed_claim", + }), + } + }) + .collect() +} + +fn successful_section_states( + before_sections: &[KnowledgePageSection], + rebuilt_sections: &[KnowledgePageSectionResponse], + outputs: &[KnowledgePageRebuildOutput], +) -> Vec { + let output_map = outputs_by_section(outputs); + let before_by_key = before_sections + .iter() + .map(|section| (section.section_key.as_str(), section)) + .collect::>(); + + rebuilt_sections + .iter() + .map(|section| { + let output_types = + output_map.get(section.section_key.as_str()).cloned().unwrap_or_default(); + let lint_finding_types = lint_finding_types_for_outputs(&output_types); + let state = section_state( + before_by_key.get(section.section_key.as_str()).copied(), + section, + &output_types, + ); + + KnowledgePageSectionRebuildState { + section_key: section.section_key.clone(), + heading: section.heading.clone(), + state, + output_types, + lint_finding_types, + } + }) + .collect() +} + +fn blocked_section_states( + sections: &[KnowledgePageSection], + outputs: &[KnowledgePageRebuildOutput], +) -> Vec { + let output_map = outputs_by_section(outputs); + + sections + .iter() + .map(|section| { + let output_types = + output_map.get(section.section_key.as_str()).cloned().unwrap_or_default(); + let lint_finding_types = lint_finding_types_for_outputs(&output_types); + let state = if output_types.iter().any(|kind| kind == "missing_citation") { + "blocked" + } else if output_types.iter().any(|kind| kind == "stale_section") { + "stale" + } else { + "blocked" + }; + + KnowledgePageSectionRebuildState { + section_key: section.section_key.clone(), + heading: section.heading.clone(), + state: state.to_string(), + output_types, + lint_finding_types, + } + }) + .collect() +} + +fn section_state( + before: Option<&KnowledgePageSection>, + after: &KnowledgePageSectionResponse, + output_types: &[String], +) -> String { + if output_types.iter().any(|kind| kind == "missing_citation") { + return "blocked".to_string(); + } + if before.is_some_and(|section| section.content_hash != after.content_hash) + || output_types.iter().any(|kind| kind == "changed_claim" || kind == "conflict") + { + return "changed".to_string(); + } + + if output_types.iter().any(|kind| kind == "stale_section") { + return "stale".to_string(); + } + + "unchanged".to_string() +} + +fn successful_rebuild_state( + diff: Option<&Value>, + outputs: &[KnowledgePageRebuildOutput], +) -> String { + if diff_content_changed(diff) { + return "changed".to_string(); + } + + if outputs.iter().any(|output| output.output_type == "stale_section") { + return "stale".to_string(); + } + + "unchanged".to_string() +} + +fn memory_candidates_for_page( + page: &KnowledgePageResponse, + outputs: &[KnowledgePageRebuildOutput], +) -> Vec { + let reasons = candidate_reasons_by_section(outputs); + + page.sections + .iter() + .filter_map(|section| { + let reason = reasons.get(section.section_key.as_str())?; + + memory_candidate_for_section(page, section, reason.as_str()) + }) + .collect() +} + +fn memory_candidate_for_section( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Option { + let source_refs = page + .source_refs + .iter() + .filter(|source_ref| source_ref.section_id == Some(section.section_id)) + .filter_map(|source_ref| consolidation_input_ref(source_ref, page, section, reason)) + .collect::>(); + + if source_refs.is_empty() { + return None; + } + + let source_snapshot = candidate_source_snapshot(page, section, reason, &source_refs); + let diff = candidate_diff(page, section, reason); + let proposed_payload = candidate_proposed_payload(page, section, reason); + + Some(KnowledgeDeltaMemoryCandidate { + reason: reason.to_string(), + page_id: page.page.page_id, + section_id: section.section_id, + section_key: section.section_key.clone(), + source_refs, + source_snapshot, + diff, + proposed_payload, + }) +} + +fn consolidation_input_ref( + source_ref: &KnowledgePageSourceRefResponse, + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Option { + let kind = consolidation_source_kind(source_ref.source_kind.as_str())?; + + Some(ConsolidationInputRef { + kind, + id: source_ref.source_id, + snapshot: ConsolidationSourceSnapshot { + status: source_ref.source_status.clone(), + updated_at: source_ref.source_updated_at, + content_hash: source_ref.source_content_hash.clone(), + embedding_version: None, + trace_version: None, + source_ref: source_ref.source_snapshot.clone(), + metadata: serde_json::json!({ + "schema": "elf.knowledge_delta.source_ref/v1", + "reason": reason, + "page_id": page.page.page_id, + "page_kind": page.page.page_kind, + "page_key": page.page.page_key, + "section_id": section.section_id, + "section_key": section.section_key, + }), + }, + }) +} + +fn consolidation_source_kind(source_kind: &str) -> Option { + match KnowledgeSourceKind::parse(source_kind)? { + KnowledgeSourceKind::Doc => Some(ConsolidationSourceKind::Doc), + KnowledgeSourceKind::DocChunk => Some(ConsolidationSourceKind::DocChunk), + KnowledgeSourceKind::Note => Some(ConsolidationSourceKind::Note), + KnowledgeSourceKind::Event => Some(ConsolidationSourceKind::Event), + KnowledgeSourceKind::Relation | KnowledgeSourceKind::Proposal => None, + } +} + +fn candidate_source_snapshot( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, + source_refs: &[ConsolidationInputRef], +) -> Value { + serde_json::json!({ + "schema": "elf.knowledge_delta.source_snapshot/v1", + "reason": reason, + "page": { + "page_id": page.page.page_id, + "page_kind": page.page.page_kind, + "page_key": page.page.page_key, + "content_hash": page.page.content_hash, + "rebuild_source_hash": page.page.rebuild_source_hash, + "previous_version_diff": page.page.previous_version_diff, + }, + "section": { + "section_id": section.section_id, + "section_key": section.section_key, + "heading": section.heading, + "content_hash": section.content_hash, + "citation_count": section.citation_count, + "source_ref_count": section.source_ref_count, + }, + "source_ref_count": source_refs.len(), + "source_mutation_allowed": false, + }) +} + +fn candidate_diff( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> ConsolidationProposalDiff { + ConsolidationProposalDiff { + summary: format!( + "Create a reviewable memory candidate for knowledge page '{}' section '{}' because {reason}.", + page.page.page_key, section.section_key + ), + before: serde_json::json!({ + "page_id": page.page.page_id, + "section_id": section.section_id, + "previous_version_diff": page.page.previous_version_diff, + }), + after: serde_json::json!({ + "target": "derived_note", + "reason": reason, + "page_id": page.page.page_id, + "section_id": section.section_id, + "section_key": section.section_key, + }), + } +} + +fn candidate_proposed_payload( + page: &KnowledgePageResponse, + section: &KnowledgePageSectionResponse, + reason: &str, +) -> Value { + let text = truncate_chars( + format!( + "Plan: Review knowledge page {} section {} because source changes produced a {reason} delta.", + page.page.page_key, section.section_key + ) + .as_str(), + 220, + ); + + serde_json::json!({ + "type": "plan", + "key": format!( + "knowledge_delta_{}_{}", + page.page.page_key.replace('-', "_"), + section.section_key.replace('-', "_") + ), + "text": text, + "scope": "project_shared", + "importance": 0.65, + "confidence": 0.72, + "source_ref": { + "schema": "elf.knowledge_delta/v1", + "reason": reason, + "page_id": page.page.page_id, + "section_id": section.section_id, + "page_key": page.page.page_key, + "section_key": section.section_key, + "source_mutation_allowed": false, + } + }) +} + +fn candidate_proposal_input( + candidate: &KnowledgeDeltaMemoryCandidate, +) -> ConsolidationProposalInput { + ConsolidationProposalInput { + proposal_kind: "knowledge_delta_memory_candidate".to_string(), + apply_intent: ConsolidationApplyIntent::CreateDerivedNote, + source_refs: candidate.source_refs.clone(), + source_snapshot: candidate.source_snapshot.clone(), + lineage: ConsolidationLineage { + source_refs: candidate.source_refs.clone(), + parent_run_id: None, + parent_proposal_ids: Vec::new(), + }, + confidence: 0.72, + unsupported_claim_flags: Vec::new(), + markers: candidate_markers(candidate), + diff: candidate.diff.clone(), + target_ref: empty_object(), + proposed_payload: candidate.proposed_payload.clone(), + } +} + +fn candidate_markers(candidate: &KnowledgeDeltaMemoryCandidate) -> ConsolidationMarkers { + let marker = ConsolidationMarker { + severity: ConsolidationMarkerSeverity::Medium, + message: format!( + "Knowledge delta '{}' requires reviewer confirmation before memory promotion.", + candidate.reason + ), + source: candidate.source_refs.first().cloned(), + }; + + if candidate.reason == "conflict" { + ConsolidationMarkers { contradictions: vec![marker], staleness: Vec::new() } + } else { + ConsolidationMarkers { contradictions: Vec::new(), staleness: vec![marker] } + } +} + +fn candidate_run_input_refs( + candidates: &[KnowledgeDeltaMemoryCandidate], +) -> Vec { + let mut seen = BTreeSet::new(); + let mut out = Vec::new(); + + for source_ref in candidates.iter().flat_map(|candidate| &candidate.source_refs) { + if seen.insert((source_ref.kind.as_str().to_string(), source_ref.id)) { + out.push(source_ref.clone()); + } + } + + out +} + +fn knowledge_delta_source_snapshot( + changed_sources: &[KnowledgePageChangedSource], + candidates: &[KnowledgeDeltaMemoryCandidate], +) -> Value { + serde_json::json!({ + "schema": "elf.knowledge_delta.run_source_snapshot/v1", + "changed_sources": changed_sources, + "candidate_count": candidates.len(), + "candidate_reasons": candidates + .iter() + .map(|candidate| candidate.reason.clone()) + .collect::>(), + "source_mutation_allowed": false, + }) +} + +fn proposal_run_summary( + created: ConsolidationRunCreateResponse, + proposal_count: usize, +) -> KnowledgePageProposalRunSummary { + KnowledgePageProposalRunSummary { + run_id: created.run.run_id, + job_id: created.job_id, + proposal_count, + review_surface: "consolidation_proposals".to_string(), + } +} + +fn watch_rebuild_summary( + changed_source_count: usize, + items: &[KnowledgePageWatchRebuildItem], + memory_candidate_count: usize, +) -> KnowledgePageWatchRebuildSummary { + KnowledgePageWatchRebuildSummary { + changed_source_count, + affected_page_count: items.len(), + changed_page_count: items.iter().filter(|item| item.rebuild_state == "changed").count(), + unchanged_page_count: items.iter().filter(|item| item.rebuild_state == "unchanged").count(), + stale_page_count: items + .iter() + .filter(|item| item.outputs.iter().any(|output| output.output_type == "stale_section")) + .count(), + blocked_page_count: items.iter().filter(|item| item.rebuild_state == "blocked").count(), + memory_candidate_count, + } +} + +fn watch_operator_summary( + summary: &KnowledgePageWatchRebuildSummary, + proposal_run: Option<&KnowledgePageProposalRunSummary>, +) -> Vec { + let mut out = vec![format!( + "Changed-source rebuild inspected {} sources and {} affected knowledge pages.", + summary.changed_source_count, summary.affected_page_count + )]; + + out.push(format!( + "Page states: changed={}, unchanged={}, stale={}, blocked={}.", + summary.changed_page_count, + summary.unchanged_page_count, + summary.stale_page_count, + summary.blocked_page_count + )); + out.push(format!( + "Generated {} reviewable memory candidate proposals; source mutation remains disabled.", + summary.memory_candidate_count + )); + + if let Some(run) = proposal_run { + out.push(format!( + "Queued consolidation run {} with {} proposal payloads for review.", + run.run_id, run.proposal_count + )); + } + + out +} + +fn page_operator_summary( + page_key: &str, + rebuild_state: &str, + output_count: usize, + candidate_count: usize, +) -> String { + format!( + "Knowledge page '{page_key}' rebuild_state={rebuild_state}, outputs={output_count}, memory_candidates={candidate_count}." + ) +} + +fn section_lookup(sections: &[KnowledgePageSection]) -> BTreeMap { + sections + .iter() + .map(|section| (section.section_id, (section.section_key.clone(), section.heading.clone()))) + .collect() +} + +fn diff_section_keys(diff: Option<&Value>, key: &str) -> BTreeSet { + diff.and_then(|value| value.get(key)) + .and_then(Value::as_array) + .map(|items| items.iter().filter_map(Value::as_str).map(ToString::to_string).collect()) + .unwrap_or_default() +} + +fn diff_content_changed(diff: Option<&Value>) -> bool { + diff.and_then(|value| value.get("content_changed")).and_then(Value::as_bool).unwrap_or(false) + || !diff_section_keys(diff, "added_section_keys").is_empty() + || !diff_section_keys(diff, "removed_section_keys").is_empty() + || !diff_section_keys(diff, "changed_section_keys").is_empty() +} + +fn changed_source_set(changed_sources: &[KnowledgePageChangedSource]) -> BTreeSet<(String, Uuid)> { + changed_sources + .iter() + .map(|source| (source.source_kind.as_str().to_string(), source.source_id)) + .collect() +} + +fn output_section_keys( + outputs: &[KnowledgePageRebuildOutput], + output_type: &str, +) -> BTreeSet { + outputs + .iter() + .filter(|output| output.output_type == output_type) + .filter_map(|output| output.section_key.clone()) + .collect() +} + +fn outputs_by_section(outputs: &[KnowledgePageRebuildOutput]) -> BTreeMap<&str, Vec> { + let mut map = BTreeMap::<&str, Vec>::new(); + + for output in outputs { + let Some(section_key) = output.section_key.as_deref() else { + continue; + }; + + map.entry(section_key).or_default().push(output.output_type.clone()); + } + for values in map.values_mut() { + values.sort(); + values.dedup(); + } + + map +} + +fn lint_finding_types_for_outputs(output_types: &[String]) -> Vec { + let mut out = output_types + .iter() + .filter_map(|output_type| match output_type.as_str() { + "stale_section" => Some("stale_source_ref".to_string()), + "missing_citation" => Some("missing_citation".to_string()), + _ => None, + }) + .collect::>(); + + out.sort(); + out.dedup(); + + out +} + +fn candidate_reasons_by_section(outputs: &[KnowledgePageRebuildOutput]) -> BTreeMap<&str, String> { + let mut reasons = BTreeMap::<&str, String>::new(); + + for output in outputs { + let Some(section_key) = output.section_key.as_deref() else { + continue; + }; + + match output.output_type.as_str() { + "conflict" => { + reasons.insert(section_key, "conflict".to_string()); + }, + "changed_claim" => { + reasons.entry(section_key).or_insert_with(|| "changed_claim".to_string()); + }, + _ => {}, + } + } + + reasons +} + +fn lint_output_message(output_type: &str, heading: &str) -> String { + match output_type { + "stale_section" => + format!("Knowledge page section '{heading}' cites a stale or missing source."), + "missing_citation" => + format!("Knowledge page section '{heading}' is missing citation coverage."), + _ => format!("Knowledge page section '{heading}' needs operator review."), + } +} + +fn default_generate_memory_candidates() -> bool { + true } fn source_snapshots( @@ -2274,10 +3387,12 @@ async fn insert_lint_finding( #[cfg(test)] mod tests { use crate::knowledge::{ - self, DraftSection, KnowledgePage, KnowledgePageKind, KnowledgePageSearchRow, - KnowledgePageSection, KnowledgePageSourceRef, KnowledgeSourceKind, OffsetDateTime, - SourceSnapshot, Uuid, + self, DraftSection, KnowledgeDeltaMemoryCandidate, KnowledgePage, KnowledgePageKind, + KnowledgePageResponse, KnowledgePageSearchRow, KnowledgePageSection, + KnowledgePageSectionResponse, KnowledgePageSourceRef, KnowledgePageSourceRefResponse, + KnowledgePageSummary, KnowledgeSourceKind, LintDraft, OffsetDateTime, SourceSnapshot, Uuid, }; + use elf_domain::consolidation::ConsolidationApplyIntent; fn test_source(kind: KnowledgeSourceKind, raw_id: u128, line: &str) -> SourceSnapshot { let id = Uuid::from_u128(raw_id); @@ -2497,6 +3612,80 @@ mod tests { assert_eq!(finding.source_id, Some(source_id)); } + #[test] + fn watch_rebuild_outputs_cover_source_update_and_stale_page() { + let section_id = Uuid::from_u128(50); + let source_id = Uuid::from_u128(51); + let section = test_section( + section_id, + "source-notes", + serde_json::json!([{ "source_kind": "note", "source_id": source_id }]), + None, + ); + let source_ref = test_source_ref_for(section_id, source_id, "old-hash"); + let lint = vec![LintDraft { + section_id: Some(section_id), + finding_type: "stale_source_ref".to_string(), + severity: "warning".to_string(), + source_kind: Some(KnowledgeSourceKind::Note), + source_id: Some(source_id), + message: "Knowledge page source reference snapshot is stale.".to_string(), + details: serde_json::json!({ "stored": "old", "current": "new" }), + }]; + let diff = serde_json::json!({ + "available": true, + "content_changed": true, + "changed_section_keys": ["source-notes"] + }); + let changed_sources = vec![knowledge::KnowledgePageChangedSource { + source_kind: KnowledgeSourceKind::Note, + source_id, + }]; + let outputs = knowledge::rebuild_outputs( + &[section], + &[source_ref], + &lint, + Some(&diff), + &changed_sources, + ); + let output_types = + outputs.iter().map(|output| output.output_type.as_str()).collect::>(); + + assert!(output_types.contains(&"stale_section")); + assert!(output_types.contains(&"changed_claim")); + assert!(output_types.contains(&"conflict")); + assert!(output_types.contains(&"changed_source")); + } + + #[test] + fn memory_candidate_uses_reviewable_consolidation_proposal_contract() { + let section_id = Uuid::from_u128(60); + let source_id = Uuid::from_u128(61); + let page = test_page_response(section_id, source_id); + let outputs = vec![knowledge::KnowledgePageRebuildOutput { + output_type: "changed_claim".to_string(), + severity: "info".to_string(), + section_key: Some("source-notes".to_string()), + source_kind: Some("note".to_string()), + source_id: Some(source_id), + message: "Changed section.".to_string(), + details: serde_json::json!({ "reason": "source_update" }), + }]; + let candidates = knowledge::memory_candidates_for_page(&page, &outputs); + + assert_eq!(candidates.len(), 1); + + assert_candidate_is_reviewable(&candidates[0]); + + let proposal = knowledge::candidate_proposal_input(&candidates[0]); + + assert_eq!(proposal.apply_intent, ConsolidationApplyIntent::CreateDerivedNote); + assert_eq!(proposal.source_refs.len(), 1); + assert_eq!(proposal.proposed_payload["source_ref"]["source_mutation_allowed"], false); + assert_eq!(proposal.proposed_payload["source_ref"]["reason"], "changed_claim"); + assert!(!proposal.markers.staleness.is_empty()); + } + #[test] fn lint_page_sections_detects_unsupported_missing_and_low_coverage() { let page = test_page(); @@ -2616,18 +3805,64 @@ mod tests { } fn test_source_ref(section_id: Uuid) -> KnowledgePageSourceRef { + test_source_ref_for(section_id, Uuid::from_u128(31), "source-hash") + } + + fn test_source_ref_for( + section_id: Uuid, + source_id: Uuid, + source_hash: &str, + ) -> KnowledgePageSourceRef { KnowledgePageSourceRef { ref_id: Uuid::from_u128(30), page_id: Uuid::from_u128(21), section_id: Some(section_id), source_kind: "note".to_string(), - source_id: Uuid::from_u128(31), + source_id, source_status: Some("active".to_string()), source_updated_at: Some(OffsetDateTime::UNIX_EPOCH), - source_content_hash: Some("source-hash".to_string()), - source_snapshot: serde_json::json!({}), + source_content_hash: Some(source_hash.to_string()), + source_snapshot: serde_json::json!({ + "schema": "test_source/v1", + "source_id": source_id, + "content_hash": source_hash, + }), citation_metadata: serde_json::json!({}), created_at: OffsetDateTime::UNIX_EPOCH, } } + + fn test_page_response(section_id: Uuid, source_id: Uuid) -> KnowledgePageResponse { + let page = test_page(); + let section = test_section( + section_id, + "source-notes", + serde_json::json!([{ "source_kind": "note", "source_id": source_id }]), + None, + ); + let source_ref = test_source_ref_for(section_id, source_id, "new-hash"); + + KnowledgePageResponse { + page: KnowledgePageSummary::from(page), + sections: vec![KnowledgePageSectionResponse { + citation_count: 1, + source_ref_count: 1, + coverage_complete: true, + source_backlinks: Vec::new(), + ..KnowledgePageSectionResponse::from(section) + }], + source_refs: vec![KnowledgePageSourceRefResponse::from(source_ref)], + lint_findings: Vec::new(), + } + } + + fn assert_candidate_is_reviewable(candidate: &KnowledgeDeltaMemoryCandidate) { + assert_eq!(candidate.reason, "changed_claim"); + assert_eq!(candidate.source_refs.len(), 1); + assert_eq!(candidate.source_refs[0].kind.as_str(), "note"); + assert_eq!(candidate.source_snapshot["source_mutation_allowed"], false); + assert_eq!(candidate.diff.after["reason"], "changed_claim"); + assert_eq!(candidate.proposed_payload["type"], "plan"); + assert_eq!(candidate.proposed_payload["source_ref"]["schema"], "elf.knowledge_delta/v1"); + } } diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 50e3622..7a8190d 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -95,12 +95,15 @@ pub use self::{ AdminIngestionProfilesListResponse, IngestionProfileRef, IngestionProfileSelector, }, knowledge::{ - KnowledgePageGetRequest, KnowledgePageLintFindingResponse, KnowledgePageLintRequest, - KnowledgePageLintResponse, KnowledgePageLintSummary, KnowledgePageRebuildRequest, - KnowledgePageRebuildResponse, KnowledgePageResponse, KnowledgePageSearchItem, - KnowledgePageSearchRequest, KnowledgePageSearchResponse, KnowledgePageSectionResponse, + KnowledgeDeltaMemoryCandidate, KnowledgePageChangedSource, KnowledgePageGetRequest, + KnowledgePageLintFindingResponse, KnowledgePageLintRequest, KnowledgePageLintResponse, + KnowledgePageLintSummary, KnowledgePageProposalRunSummary, KnowledgePageRebuildOutput, + KnowledgePageRebuildRequest, KnowledgePageRebuildResponse, KnowledgePageResponse, + KnowledgePageSearchItem, KnowledgePageSearchRequest, KnowledgePageSearchResponse, + KnowledgePageSectionRebuildState, KnowledgePageSectionResponse, KnowledgePageSectionSourceBacklink, KnowledgePageSourceRefResponse, KnowledgePageSummary, - KnowledgePagesListRequest, KnowledgePagesListResponse, + KnowledgePageWatchRebuildRequest, KnowledgePageWatchRebuildResponse, + KnowledgePageWatchRebuildSummary, KnowledgePagesListRequest, KnowledgePagesListResponse, }, list::{ListItem, ListRequest, ListResponse}, memory_corrections::{ diff --git a/packages/elf-storage/src/knowledge.rs b/packages/elf-storage/src/knowledge.rs index d1f0fd9..94902ab 100644 --- a/packages/elf-storage/src/knowledge.rs +++ b/packages/elf-storage/src/knowledge.rs @@ -743,6 +743,66 @@ LIMIT $4", Ok(rows) } +/// Lists knowledge pages that cite at least one changed source. +pub async fn list_knowledge_pages_for_sources<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + page_kind: Option<&str>, + source_kinds: &[String], + source_ids: &[Uuid], + limit: i64, +) -> Result> +where + E: PgExecutor<'e>, +{ + if source_kinds.is_empty() || source_ids.is_empty() { + return Ok(Vec::new()); + } + + let rows = sqlx::query_as::<_, KnowledgePage>( + "\ +SELECT DISTINCT + p.page_id, + p.tenant_id, + p.project_id, + p.page_kind, + p.page_key, + p.title, + p.contract_schema, + p.status, + p.rebuild_source_hash, + p.content_hash, + p.source_coverage, + p.source_snapshot, + p.rebuild_metadata, + p.created_at, + p.updated_at, + p.rebuilt_at +FROM knowledge_pages p +JOIN knowledge_page_source_refs r ON r.page_id = p.page_id +JOIN unnest($4::text[], $5::uuid[]) AS changed(source_kind, source_id) + ON changed.source_kind = r.source_kind + AND changed.source_id = r.source_id +WHERE p.tenant_id = $1 + AND p.project_id = $2 + AND ($3::text IS NULL OR p.page_kind = $3) + AND p.status IN ('active', 'stale') +ORDER BY p.updated_at DESC, p.page_id DESC +LIMIT $6", + ) + .bind(tenant_id) + .bind(project_id) + .bind(page_kind) + .bind(source_kinds) + .bind(source_ids) + .bind(limit) + .fetch_all(executor) + .await?; + + Ok(rows) +} + /// Lists sections for one knowledge page. pub async fn list_knowledge_page_sections<'e, E>( executor: E,