From ed3c58f8357f0de46c1f27d344fe81558a32c3cd Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Mon, 8 Jun 2026 14:46:00 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Add reviewable consolidation proposal store and job contract","authority":"XY-800"} --- Cargo.lock | 1 + docs/spec/index.md | 7 + .../spec/system_consolidation_proposals_v1.md | 209 ++++++ packages/elf-domain/Cargo.toml | 2 + packages/elf-domain/src/consolidation.rs | 520 +++++++++++++++ packages/elf-domain/src/lib.rs | 1 + packages/elf-domain/tests/consolidation.rs | 157 +++++ packages/elf-service/src/consolidation.rs | 622 ++++++++++++++++++ packages/elf-service/src/lib.rs | 8 + packages/elf-storage/src/consolidation.rs | 446 +++++++++++++ packages/elf-storage/src/lib.rs | 1 + packages/elf-storage/src/models.rs | 84 +++ packages/elf-storage/src/schema.rs | 4 + packages/elf-storage/tests/db_smoke.rs | 18 + sql/init.sql | 2 + sql/tables/031_consolidation_runs.sql | 52 ++ sql/tables/032_consolidation_proposals.sql | 106 +++ 17 files changed, 2240 insertions(+) create mode 100644 docs/spec/system_consolidation_proposals_v1.md create mode 100644 packages/elf-domain/src/consolidation.rs create mode 100644 packages/elf-domain/tests/consolidation.rs create mode 100644 packages/elf-service/src/consolidation.rs create mode 100644 packages/elf-storage/src/consolidation.rs create mode 100644 sql/tables/031_consolidation_runs.sql create mode 100644 sql/tables/032_consolidation_proposals.sql diff --git a/Cargo.lock b/Cargo.lock index d810614e..ccd3b168 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -956,6 +956,7 @@ dependencies = [ "time", "unicode-normalization", "unicode-script", + "uuid", "whatlang", ] diff --git a/docs/spec/index.md b/docs/spec/index.md index 10eeb638..ba425c19 100644 --- a/docs/spec/index.md +++ b/docs/spec/index.md @@ -29,6 +29,13 @@ Question this index answers: "what must remain true?" - State transitions and protocol rules. - Behavior that tests, code, or operators should treat as authoritative. +## Documents + +- `system_elf_memory_service_v2.md`: Core ELF memory service contract, API semantics, + and storage invariants. +- `system_consolidation_proposals_v1.md`: Reviewable derived consolidation run and + proposal contract over immutable source evidence. + ## Spec document contract Start each spec with a compact routing header: diff --git a/docs/spec/system_consolidation_proposals_v1.md b/docs/spec/system_consolidation_proposals_v1.md new file mode 100644 index 00000000..ff27cd1a --- /dev/null +++ b/docs/spec/system_consolidation_proposals_v1.md @@ -0,0 +1,209 @@ +# Consolidation Proposals v1 Specification + +Purpose: Define the reviewable consolidation run and proposal contract for derived memory output. +Status: normative +Read this when: You are implementing, validating, or reviewing dreaming-inspired consolidation storage, jobs, proposals, or review flows. +Not this document: Live LLM consolidation generation, viewer UI behavior, retrieval observability panels, or agentmemory import adapters. +Defines: `elf.consolidation/v1` runs, proposals, source snapshots, lineage, review lifecycle, and source immutability rules. + +Related inputs: + +- `docs/research/2026-06-08-agent-memory-selection.json` +- `docs/guide/research/comparison_external_projects.md` +- `docs/spec/system_elf_memory_service_v2.md` + +## Core Rule + +Consolidation output is derived and reviewable. It must never destructively rewrite +authoritative source notes, events, docs, traces, graph facts, or search traces. + +The authoritative source-of-truth remains the ELF Core storage defined by +`docs/spec/system_elf_memory_service_v2.md`. Consolidation stores proposals over +immutable input snapshots. A proposal may later create or update a derived artifact, +but source evidence remains inspectable and unchanged. + +## Contract Schema + +Canonical schema identifier: + +```text +elf.consolidation/v1 +``` + +Every persisted run and proposal must carry `contract_schema = "elf.consolidation/v1"`. + +## Source References + +`source_refs` is a non-empty array of immutable input pointers. + +Each item has: + +- `kind`: one of `note`, `event`, `trace`, `trace_item`, `doc`, `doc_chunk` +- `id`: UUID of the referenced source artifact +- `snapshot`: source snapshot metadata captured before proposal storage + +`snapshot` must contain at least one freshness or replay guard: + +- `status` +- `updated_at` +- `content_hash` +- `embedding_version` +- `trace_version` +- non-empty `source_ref` +- non-empty `metadata` + +`source_ref` and `metadata` must be JSON objects. + +## Run Contract + +Storage table: `consolidation_runs`. + +Required fields: + +- `run_id` +- `tenant_id` +- `project_id` +- `agent_id` +- `contract_schema` +- `job_kind` +- `status` +- `input_refs` +- `source_snapshot` +- `lineage` +- `error` +- `created_at` +- `updated_at` +- `completed_at` + +`job_kind` identifies how the run was registered, for example `fixture`, `manual`, or +future `scheduled`. This issue only permits fixture-driven or manually supplied +proposal payloads. It does not permit live provider generation. + +Run states: + +- `pending` +- `running` +- `completed` +- `failed` +- `cancelled` + +Allowed run transitions: + +- `pending -> running` +- `pending -> cancelled` +- `running -> completed` +- `running -> failed` +- `running -> cancelled` + +Terminal states are `completed`, `failed`, and `cancelled`. + +## Proposal Contract + +Storage table: `consolidation_proposals`. + +Required fields: + +- `proposal_id` +- `run_id` +- `tenant_id` +- `project_id` +- `agent_id` +- `contract_schema` +- `proposal_kind` +- `apply_intent` +- `review_state` +- `source_refs` +- `source_snapshot` +- `lineage` +- `diff` +- `confidence` +- `contradiction_markers` +- `staleness_markers` +- `target_ref` +- `proposed_payload` +- `reviewer_agent_id` +- `review_comment` +- `reviewed_at` +- `created_at` +- `updated_at` + +`confidence` must be finite and in the inclusive range `0.0..=1.0`. + +`lineage` must include non-empty `source_refs`. It may also include `parent_run_id` +and `parent_proposal_ids`. + +`contradiction_markers` and `staleness_markers` are review prompts. Each marker has: + +- `severity`: `low`, `medium`, or `high` +- `message`: non-empty reviewer-facing text +- `source`: optional source reference + +## Diff And Apply Intent + +`diff` is a JSON object with: + +- `summary`: non-empty text +- `before`: JSON object +- `after`: JSON object + +The diff must describe a derived output change. It must not include source mutation +keys such as `source_mutation`, `source_mutations`, `source_note_updates`, +`delete_source`, `delete_sources`, `source_delete`, or `overwrite_source`. + +Allowed `apply_intent` values: + +- `create_derived_note` +- `update_derived_note` +- `create_derived_knowledge_page` +- `update_derived_knowledge_page` +- `create_derived_graph_view` +- `no_op` + +No `apply_intent` may update, delete, overwrite, or deprecate authoritative source +notes, docs, events, traces, or graph facts. + +## Review Lifecycle + +Review states: + +- `proposed` +- `approved` +- `rejected` +- `applied` +- `archived` + +Allowed review transitions: + +- `proposed -> approved` +- `proposed -> rejected` +- `proposed -> archived` +- `approved -> applied` +- `approved -> rejected` +- `approved -> archived` + +Terminal states are `rejected`, `applied`, and `archived`. + +`applied` means the proposal has been approved and marked as applied to the derived +target. It does not mean authoritative source memory was changed. + +## Service Boundary + +The first implementation exposes fixture-driven service flows: + +- create a consolidation run with optional proposal payloads +- list consolidation runs +- get a consolidation run +- list consolidation proposals +- get a consolidation proposal +- transition proposal review state + +These flows must not call LLM, embedding, rerank, or external provider adapters. + +## Future Connections + +Future viewer work should render proposals as reviewable records with source refs, +snapshots, lineage, diff, confidence, contradiction markers, and staleness markers. + +Future derived knowledge pages may use approved proposals as input, but those pages +remain rebuildable derived output. They must retain source pointers and must not become +a hidden replacement for evidence-bound ELF Core memory. diff --git a/packages/elf-domain/Cargo.toml b/packages/elf-domain/Cargo.toml index dda8fdcf..25c4d732 100644 --- a/packages/elf-domain/Cargo.toml +++ b/packages/elf-domain/Cargo.toml @@ -6,9 +6,11 @@ version = "0.2.0" [dependencies] regex = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } time = { workspace = true } unicode-normalization = { workspace = true } unicode-script = { workspace = true } +uuid = { workspace = true } whatlang = { workspace = true } elf-config = { workspace = true } diff --git a/packages/elf-domain/src/consolidation.rs b/packages/elf-domain/src/consolidation.rs new file mode 100644 index 00000000..cd957554 --- /dev/null +++ b/packages/elf-domain/src/consolidation.rs @@ -0,0 +1,520 @@ +//! Consolidation proposal contract validation. + +use std::{ + error::Error, + fmt::{Display, Formatter}, +}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; +use uuid::Uuid; + +/// Current consolidation contract schema identifier. +pub const CONSOLIDATION_CONTRACT_SCHEMA_V1: &str = "elf.consolidation/v1"; + +const FORBIDDEN_DIFF_KEYS: [&str; 7] = [ + "delete_source", + "delete_sources", + "source_delete", + "source_mutation", + "source_mutations", + "source_note_updates", + "overwrite_source", +]; + +/// Error returned by consolidation contract validation. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ConsolidationValidationError { + /// A required source reference list was empty. + MissingSourceRefs, + /// A source snapshot did not include any immutable freshness guard. + MissingSourceSnapshot, + /// A JSON field was not the required object shape. + InvalidJsonObject { + /// Name of the invalid field. + field: &'static str, + }, + /// A required text field was empty. + EmptyText { + /// Name of the invalid field. + field: &'static str, + }, + /// A confidence value was outside the inclusive range 0.0 to 1.0. + InvalidConfidence, + /// The proposal diff included a source mutation key. + DestructiveDiff, + /// A proposal review transition is not allowed by the lifecycle. + InvalidReviewTransition { + /// Current review state. + from: ConsolidationReviewState, + /// Requested review state. + to: ConsolidationReviewState, + }, + /// A run state transition is not allowed by the job lifecycle. + InvalidRunTransition { + /// Current run state. + from: ConsolidationRunState, + /// Requested run state. + to: ConsolidationRunState, + }, + /// A stored state string is not part of the contract. + UnknownState { + /// Name of the invalid field. + field: &'static str, + }, +} +impl Display for ConsolidationValidationError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::MissingSourceRefs => write!(f, "source_refs must not be empty"), + Self::MissingSourceSnapshot => + write!(f, "source snapshot must include at least one freshness guard"), + Self::InvalidJsonObject { field } => write!(f, "{field} must be a JSON object"), + Self::EmptyText { field } => write!(f, "{field} must not be empty"), + Self::InvalidConfidence => write!(f, "confidence must be in the range 0.0..=1.0"), + Self::DestructiveDiff => write!(f, "proposal diff must not mutate source memory"), + Self::InvalidReviewTransition { from, to } => + write!(f, "invalid proposal review transition from {from:?} to {to:?}"), + Self::InvalidRunTransition { from, to } => + write!(f, "invalid consolidation run transition from {from:?} to {to:?}"), + Self::UnknownState { field } => write!(f, "{field} is not a known state"), + } + } +} +impl Error for ConsolidationValidationError {} + +/// Source artifact kind accepted by consolidation input references. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationSourceKind { + /// Memory note evidence. + Note, + /// Event ingestion source. + Event, + /// Search trace source. + Trace, + /// Search trace item source. + TraceItem, + /// Document extension source. + Doc, + /// Document chunk source. + DocChunk, +} +impl ConsolidationSourceKind { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Note => "note", + Self::Event => "event", + Self::Trace => "trace", + Self::TraceItem => "trace_item", + Self::Doc => "doc", + Self::DocChunk => "doc_chunk", + } + } +} + +/// Immutable source snapshot guard captured before a proposal is stored. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationSourceSnapshot { + /// Source lifecycle status observed by the consolidation run. + pub status: Option, + /// Source last-update timestamp observed by the consolidation run. + pub updated_at: Option, + /// Source content or payload hash, when available. + pub content_hash: Option, + /// Source embedding version, when relevant. + pub embedding_version: Option, + /// Trace schema or trace version, when the source is a trace. + pub trace_version: Option, + #[serde(default)] + /// Opaque source reference copied from the authoritative source. + pub source_ref: Value, + #[serde(default)] + /// Additional snapshot metadata used for replay or review. + pub metadata: Value, +} +impl ConsolidationSourceSnapshot { + /// Validates snapshot shape and immutable freshness guards. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + validate_json_object("source_ref", &self.source_ref)?; + validate_json_object("metadata", &self.metadata)?; + + let has_hash = self.content_hash.as_ref().is_some_and(|hash| !hash.trim().is_empty()); + let has_embedding = + self.embedding_version.as_ref().is_some_and(|version| !version.trim().is_empty()); + let has_status = self.status.as_ref().is_some_and(|status| !status.trim().is_empty()); + let has_source_ref = non_empty_object(&self.source_ref); + let has_metadata = non_empty_object(&self.metadata); + let has_guard = self.updated_at.is_some() + || self.trace_version.is_some() + || has_hash + || has_embedding + || has_status + || has_source_ref + || has_metadata; + + if has_guard { Ok(()) } else { Err(ConsolidationValidationError::MissingSourceSnapshot) } + } +} + +/// Stable pointer to one immutable consolidation input. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationInputRef { + /// Kind of source artifact being referenced. + pub kind: ConsolidationSourceKind, + /// Identifier of the source artifact. + pub id: Uuid, + /// Snapshot metadata captured before proposal generation. + pub snapshot: ConsolidationSourceSnapshot, +} +impl ConsolidationInputRef { + /// Validates the input reference and its snapshot guard. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + self.snapshot.validate() + } +} + +/// Confidence or honesty marker severity. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationMarkerSeverity { + /// Low-severity marker. + Low, + /// Medium-severity marker. + Medium, + /// High-severity marker. + High, +} + +/// One contradiction or staleness marker attached to a proposal. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationMarker { + /// Marker severity. + pub severity: ConsolidationMarkerSeverity, + /// Human-readable marker text. + pub message: String, + /// Optional source that triggered the marker. + pub source: Option, +} +impl ConsolidationMarker { + /// Validates marker content and optional source evidence. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + if self.message.trim().is_empty() { + return Err(ConsolidationValidationError::EmptyText { field: "marker.message" }); + } + + if let Some(source) = &self.source { + source.validate()?; + } + + Ok(()) + } +} + +/// Contradiction and staleness markers attached to a proposal. +#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationMarkers { + #[serde(default)] + /// Contradiction markers that a reviewer must inspect. + pub contradictions: Vec, + #[serde(default)] + /// Staleness markers that a reviewer must inspect. + pub staleness: Vec, +} +impl ConsolidationMarkers { + /// Validates all marker payloads. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + for marker in self.contradictions.iter().chain(self.staleness.iter()) { + marker.validate()?; + } + + Ok(()) + } +} + +/// Derived-output apply intent for a reviewable proposal. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationApplyIntent { + /// Create a new derived memory note after review. + CreateDerivedNote, + /// Update an existing derived memory note after review. + UpdateDerivedNote, + /// Create a derived knowledge page after review. + CreateDerivedKnowledgePage, + /// Update a derived knowledge page after review. + UpdateDerivedKnowledgePage, + /// Create or refresh a derived graph view after review. + CreateDerivedGraphView, + /// Store the proposal for review without applying a downstream derived artifact. + NoOp, +} +impl ConsolidationApplyIntent { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::CreateDerivedNote => "create_derived_note", + Self::UpdateDerivedNote => "update_derived_note", + Self::CreateDerivedKnowledgePage => "create_derived_knowledge_page", + Self::UpdateDerivedKnowledgePage => "update_derived_knowledge_page", + Self::CreateDerivedGraphView => "create_derived_graph_view", + Self::NoOp => "no_op", + } + } +} + +/// Review lifecycle for a consolidation proposal. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationReviewState { + /// Proposal is awaiting review. + Proposed, + /// Proposal has been approved for downstream derived-output application. + Approved, + /// Proposal was rejected by a reviewer. + Rejected, + /// Proposal was approved and marked applied to the derived target. + Applied, + /// Proposal is retained but no longer active for review. + Archived, +} +impl ConsolidationReviewState { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Proposed => "proposed", + Self::Approved => "approved", + Self::Rejected => "rejected", + Self::Applied => "applied", + Self::Archived => "archived", + } + } + + /// Parses a canonical storage string. + pub fn parse(raw: &str) -> Option { + match raw { + "proposed" => Some(Self::Proposed), + "approved" => Some(Self::Approved), + "rejected" => Some(Self::Rejected), + "applied" => Some(Self::Applied), + "archived" => Some(Self::Archived), + _ => None, + } + } + + /// Validates a review lifecycle transition. + pub fn validate_transition(self, to: Self) -> Result<(), ConsolidationValidationError> { + let allowed = match self { + Self::Proposed => matches!(to, Self::Approved | Self::Rejected | Self::Archived), + Self::Approved => matches!(to, Self::Applied | Self::Rejected | Self::Archived), + Self::Rejected | Self::Applied | Self::Archived => false, + }; + + if allowed { + Ok(()) + } else { + Err(ConsolidationValidationError::InvalidReviewTransition { from: self, to }) + } + } +} + +/// Consolidation job lifecycle. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ConsolidationRunState { + /// Job has been registered but has not started. + Pending, + /// Job is actively generating fixture or future provider-backed proposals. + Running, + /// Job completed proposal generation. + Completed, + /// Job failed before completion. + Failed, + /// Job was cancelled by an operator. + Cancelled, +} +impl ConsolidationRunState { + /// Returns the canonical storage string. + pub fn as_str(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Running => "running", + Self::Completed => "completed", + Self::Failed => "failed", + Self::Cancelled => "cancelled", + } + } + + /// Parses a canonical storage string. + pub fn parse(raw: &str) -> Option { + match raw { + "pending" => Some(Self::Pending), + "running" => Some(Self::Running), + "completed" => Some(Self::Completed), + "failed" => Some(Self::Failed), + "cancelled" => Some(Self::Cancelled), + _ => None, + } + } + + /// Validates a job lifecycle transition. + pub fn validate_transition(self, to: Self) -> Result<(), ConsolidationValidationError> { + let allowed = match self { + Self::Pending => matches!(to, Self::Running | Self::Cancelled), + Self::Running => matches!(to, Self::Completed | Self::Failed | Self::Cancelled), + Self::Completed | Self::Failed | Self::Cancelled => false, + }; + + if allowed { + Ok(()) + } else { + Err(ConsolidationValidationError::InvalidRunTransition { from: self, to }) + } + } +} + +/// Reviewable diff between prior derived output and proposed derived output. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationProposalDiff { + /// Human-readable diff summary. + pub summary: String, + #[serde(default)] + /// Previous derived output snapshot, or an empty object for creates. + pub before: Value, + #[serde(default)] + /// Proposed derived output snapshot. + pub after: Value, +} +impl ConsolidationProposalDiff { + /// Validates diff shape and rejects source-mutation payloads. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + if self.summary.trim().is_empty() { + return Err(ConsolidationValidationError::EmptyText { field: "diff.summary" }); + } + + validate_json_object("diff.before", &self.before)?; + validate_json_object("diff.after", &self.after)?; + + if contains_forbidden_diff_key(&self.before) || contains_forbidden_diff_key(&self.after) { + return Err(ConsolidationValidationError::DestructiveDiff); + } + + Ok(()) + } +} + +/// Source lineage for one consolidation proposal. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationLineage { + /// Source references directly supporting the proposal. + pub source_refs: Vec, + /// Parent consolidation run, when this proposal is derived from an earlier run. + pub parent_run_id: Option, + #[serde(default)] + /// Parent proposals used as lineage inputs. + pub parent_proposal_ids: Vec, +} +impl ConsolidationLineage { + /// Validates source lineage references. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + validate_source_refs(&self.source_refs) + } +} + +/// Full reviewable consolidation proposal contract. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct ConsolidationProposalContract { + /// Proposal kind, such as `derived_note` or `knowledge_page`. + pub proposal_kind: String, + /// Derived-output apply intent. + pub apply_intent: ConsolidationApplyIntent, + /// Source references directly supporting the proposal. + pub source_refs: Vec, + #[serde(default)] + /// Aggregate source snapshot metadata for reviewer inspection. + pub source_snapshot: Value, + /// Proposal lineage. + pub lineage: ConsolidationLineage, + /// Model or fixture confidence in the proposal. + pub confidence: f32, + /// Review markers for contradiction and staleness checks. + pub markers: ConsolidationMarkers, + /// Reviewable derived-output diff. + pub diff: ConsolidationProposalDiff, + #[serde(default)] + /// Derived target reference, when the target already exists. + pub target_ref: Value, + #[serde(default)] + /// Proposed derived output payload. + pub proposed_payload: Value, +} +impl ConsolidationProposalContract { + /// Validates a proposal contract before persistence. + pub fn validate(&self) -> Result<(), ConsolidationValidationError> { + if self.proposal_kind.trim().is_empty() { + return Err(ConsolidationValidationError::EmptyText { field: "proposal_kind" }); + } + + validate_source_refs(&self.source_refs)?; + validate_json_object("source_snapshot", &self.source_snapshot)?; + + self.lineage.validate()?; + + if !self.confidence.is_finite() || !(0.0..=1.0).contains(&self.confidence) { + return Err(ConsolidationValidationError::InvalidConfidence); + } + + self.markers.validate()?; + self.diff.validate()?; + + validate_json_object("target_ref", &self.target_ref)?; + validate_json_object("proposed_payload", &self.proposed_payload)?; + + Ok(()) + } +} + +/// Validates a source reference list. +pub fn validate_source_refs( + source_refs: &[ConsolidationInputRef], +) -> Result<(), ConsolidationValidationError> { + if source_refs.is_empty() { + return Err(ConsolidationValidationError::MissingSourceRefs); + } + + for source_ref in source_refs { + source_ref.validate()?; + } + + Ok(()) +} + +fn validate_json_object( + field: &'static str, + value: &Value, +) -> Result<(), ConsolidationValidationError> { + if matches!(value, Value::Object(_)) { + Ok(()) + } else { + Err(ConsolidationValidationError::InvalidJsonObject { field }) + } +} + +fn non_empty_object(value: &Value) -> bool { + match value { + Value::Object(map) => !map.is_empty(), + _ => false, + } +} + +fn contains_forbidden_diff_key(value: &Value) -> bool { + match value { + Value::Object(map) => map.iter().any(|(key, nested)| { + FORBIDDEN_DIFF_KEYS.contains(&key.as_str()) || contains_forbidden_diff_key(nested) + }), + Value::Array(items) => items.iter().any(contains_forbidden_diff_key), + _ => false, + } +} diff --git a/packages/elf-domain/src/lib.rs b/packages/elf-domain/src/lib.rs index d41ccc1f..ec1d2fec 100644 --- a/packages/elf-domain/src/lib.rs +++ b/packages/elf-domain/src/lib.rs @@ -1,5 +1,6 @@ //! Domain-level validation and policy helpers shared across ELF services. +pub mod consolidation; pub mod english_gate; pub mod evidence; pub mod memory_policy; diff --git a/packages/elf-domain/tests/consolidation.rs b/packages/elf-domain/tests/consolidation.rs new file mode 100644 index 00000000..6d815d0f --- /dev/null +++ b/packages/elf-domain/tests/consolidation.rs @@ -0,0 +1,157 @@ +#![allow(unused_crate_dependencies)] + +//! Integration tests for consolidation proposal contract validation. + +use time::OffsetDateTime; +use uuid::Uuid; + +use elf_domain::consolidation::{ + ConsolidationApplyIntent, ConsolidationInputRef, ConsolidationLineage, ConsolidationMarkers, + ConsolidationProposalContract, ConsolidationProposalDiff, ConsolidationReviewState, + ConsolidationRunState, ConsolidationSourceKind, ConsolidationSourceSnapshot, + ConsolidationValidationError, +}; + +#[test] +fn proposal_contract_accepts_reviewable_derived_output() { + let source = source_ref(); + let proposal = proposal_contract(source); + + assert!(proposal.validate().is_ok()); +} + +#[test] +fn source_refs_require_immutable_snapshot_guards() { + let mut source = source_ref(); + + source.snapshot = ConsolidationSourceSnapshot { + status: None, + updated_at: None, + content_hash: None, + embedding_version: None, + trace_version: None, + source_ref: serde_json::json!({}), + metadata: serde_json::json!({}), + }; + + assert_eq!(source.validate(), Err(ConsolidationValidationError::MissingSourceSnapshot)); +} + +#[test] +fn proposal_contract_requires_lineage_source_refs() { + let source = source_ref(); + let mut proposal = proposal_contract(source); + + proposal.lineage.source_refs = Vec::new(); + + assert_eq!(proposal.validate(), Err(ConsolidationValidationError::MissingSourceRefs)); +} + +#[test] +fn proposal_contract_rejects_destructive_diff_payloads() { + let source = source_ref(); + let mut proposal = proposal_contract(source); + + proposal.diff.after = serde_json::json!({ + "summary": "Replace stale source facts.", + "source_mutations": [ + { "kind": "note", "op": "delete" } + ] + }); + + assert_eq!(proposal.validate(), Err(ConsolidationValidationError::DestructiveDiff)); +} + +#[test] +fn destructive_apply_intents_are_not_part_of_the_contract() { + let parsed = + serde_json::from_value::(serde_json::json!("delete_source_note")); + + assert!(parsed.is_err()); +} + +#[test] +fn proposal_lifecycle_requires_approval_before_apply() { + assert!( + ConsolidationReviewState::Proposed + .validate_transition(ConsolidationReviewState::Applied) + .is_err() + ); + assert!( + ConsolidationReviewState::Proposed + .validate_transition(ConsolidationReviewState::Approved) + .is_ok() + ); + assert!( + ConsolidationReviewState::Approved + .validate_transition(ConsolidationReviewState::Applied) + .is_ok() + ); + assert!( + ConsolidationReviewState::Applied + .validate_transition(ConsolidationReviewState::Rejected) + .is_err() + ); +} + +#[test] +fn run_lifecycle_rejects_skipping_generation_state() { + assert!( + ConsolidationRunState::Pending + .validate_transition(ConsolidationRunState::Completed) + .is_err() + ); + assert!( + ConsolidationRunState::Pending.validate_transition(ConsolidationRunState::Running).is_ok() + ); + assert!( + ConsolidationRunState::Running + .validate_transition(ConsolidationRunState::Completed) + .is_ok() + ); +} + +fn proposal_contract(source: ConsolidationInputRef) -> ConsolidationProposalContract { + let lineage = ConsolidationLineage { + source_refs: vec![source.clone()], + parent_run_id: None, + parent_proposal_ids: Vec::new(), + }; + + ConsolidationProposalContract { + proposal_kind: "derived_note".to_string(), + apply_intent: ConsolidationApplyIntent::CreateDerivedNote, + source_refs: vec![source], + source_snapshot: serde_json::json!({ "window": "fixture" }), + lineage, + confidence: 0.85, + markers: ConsolidationMarkers::default(), + diff: ConsolidationProposalDiff { + summary: "Create one derived note from stable evidence.".to_string(), + before: serde_json::json!({}), + after: serde_json::json!({ "text": "Fact: The project keeps consolidation output reviewable." }), + }, + target_ref: serde_json::json!({}), + proposed_payload: serde_json::json!({ + "type": "fact", + "text": "Fact: The project keeps consolidation output reviewable." + }), + } +} + +fn source_ref() -> ConsolidationInputRef { + ConsolidationInputRef { + kind: ConsolidationSourceKind::Note, + id: Uuid::parse_str("11111111-1111-1111-1111-111111111111") + .expect("test UUID must be valid"), + snapshot: ConsolidationSourceSnapshot { + status: Some("active".to_string()), + updated_at: Some(OffsetDateTime::UNIX_EPOCH), + content_hash: Some("blake3:fixture".to_string()), + embedding_version: Some("fixture:model:4".to_string()), + trace_version: None, + source_ref: serde_json::json!({ "schema": "source_ref/v1", "resolver": "fixture" }), + metadata: serde_json::json!({}), + }, + } +} diff --git a/packages/elf-service/src/consolidation.rs b/packages/elf-service/src/consolidation.rs new file mode 100644 index 00000000..b5194834 --- /dev/null +++ b/packages/elf-service/src/consolidation.rs @@ -0,0 +1,622 @@ +//! Fixture-driven consolidation run and proposal service APIs. + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ElfService, Error, Result}; +use elf_domain::consolidation::{ + self, CONSOLIDATION_CONTRACT_SCHEMA_V1, ConsolidationApplyIntent, ConsolidationInputRef, + ConsolidationLineage, ConsolidationMarkers, ConsolidationProposalContract, + ConsolidationProposalDiff, ConsolidationReviewState, ConsolidationRunState, + ConsolidationValidationError, +}; +use elf_storage::{ + consolidation::{ConsolidationProposalReviewUpdate, ConsolidationRunStateUpdate}, + models::{ConsolidationProposal, ConsolidationRun}, +}; + +const DEFAULT_LIST_LIMIT: i64 = 50; +const MAX_LIST_LIMIT: i64 = 200; + +/// Request to create a fixture-backed consolidation run. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationRunCreateRequest { + /// Tenant that owns the run. + pub tenant_id: String, + /// Project that owns the run. + pub project_id: String, + /// Agent registering the run. + pub agent_id: String, + /// Job kind, such as `fixture`, `manual`, or `scheduled`. + pub job_kind: String, + /// Input references considered by the run. + pub input_refs: Vec, + #[serde(default)] + /// Aggregate source snapshot metadata for the run. + pub source_snapshot: Value, + /// Run lineage. + pub lineage: ConsolidationLineage, + #[serde(default)] + /// Fixture-generated proposals to persist with this run. + pub proposals: Vec, +} + +/// Fixture proposal input for a consolidation run. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationProposalInput { + /// Proposal kind, such as `derived_note` or `knowledge_page`. + pub proposal_kind: String, + /// Derived-output apply intent. + pub apply_intent: ConsolidationApplyIntent, + /// Source references directly supporting the proposal. + pub source_refs: Vec, + #[serde(default)] + /// Aggregate source snapshot metadata for reviewer inspection. + pub source_snapshot: Value, + /// Proposal lineage. + pub lineage: ConsolidationLineage, + /// Fixture confidence in the proposal. + pub confidence: f32, + #[serde(default)] + /// Review markers for contradiction and staleness checks. + pub markers: ConsolidationMarkers, + /// Reviewable derived-output diff. + pub diff: ConsolidationProposalDiff, + #[serde(default)] + /// Derived target reference, when the target already exists. + pub target_ref: Value, + #[serde(default)] + /// Proposed derived output payload. + pub proposed_payload: Value, +} +impl ConsolidationProposalInput { + fn validate(&self) -> Result<()> { + let contract = ConsolidationProposalContract { + proposal_kind: self.proposal_kind.clone(), + apply_intent: self.apply_intent, + source_refs: self.source_refs.clone(), + source_snapshot: self.source_snapshot.clone(), + lineage: self.lineage.clone(), + confidence: self.confidence, + markers: self.markers.clone(), + diff: self.diff.clone(), + target_ref: self.target_ref.clone(), + proposed_payload: self.proposed_payload.clone(), + }; + + contract.validate().map_err(validation_error) + } +} + +/// Response returned after creating one consolidation run. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationRunCreateResponse { + /// Created run. + pub run: ConsolidationRunResponse, + /// Proposals stored with the run. + pub proposals: Vec, +} + +/// Request to get one consolidation run. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationRunGetRequest { + /// Tenant that owns the run. + pub tenant_id: String, + /// Project that owns the run. + pub project_id: String, + /// Run identifier. + pub run_id: Uuid, +} + +/// Request to list consolidation runs. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationRunsListRequest { + /// Tenant that owns the runs. + pub tenant_id: String, + /// Project that owns the runs. + pub project_id: String, + /// Maximum number of runs to return. + pub limit: Option, +} + +/// Response returned by consolidation run listing. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationRunsListResponse { + /// Returned runs. + pub runs: Vec, +} + +/// Public consolidation run DTO. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationRunResponse { + /// Consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the run. + pub tenant_id: String, + /// Project that owns the run. + pub project_id: String, + /// Agent that registered the run. + pub agent_id: String, + /// Versioned consolidation contract schema. + pub contract_schema: String, + /// Job kind, such as fixture, manual, or scheduled. + pub job_kind: String, + /// Current run state. + pub status: String, + /// Serialized input references. + pub input_refs: Value, + /// Aggregate source snapshot metadata. + pub source_snapshot: Value, + /// Serialized run lineage. + pub lineage: Value, + /// Structured error payload for failed runs. + pub error: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, + /// Completion timestamp for terminal runs. + pub completed_at: Option, +} +impl From for ConsolidationRunResponse { + fn from(run: ConsolidationRun) -> Self { + Self { + run_id: run.run_id, + tenant_id: run.tenant_id, + project_id: run.project_id, + agent_id: run.agent_id, + contract_schema: run.contract_schema, + job_kind: run.job_kind, + status: run.status, + input_refs: run.input_refs, + source_snapshot: run.source_snapshot, + lineage: run.lineage, + error: run.error, + created_at: run.created_at, + updated_at: run.updated_at, + completed_at: run.completed_at, + } + } +} + +/// Request to get one consolidation proposal. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationProposalGetRequest { + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Proposal identifier. + pub proposal_id: Uuid, +} + +/// Request to list consolidation proposals. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationProposalsListRequest { + /// Tenant that owns the proposals. + pub tenant_id: String, + /// Project that owns the proposals. + pub project_id: String, + /// Optional run filter. + pub run_id: Option, + /// Optional review-state filter. + pub review_state: Option, + /// Maximum number of proposals to return. + pub limit: Option, +} + +/// Response returned by consolidation proposal listing. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationProposalsListResponse { + /// Returned proposals. + pub proposals: Vec, +} + +/// Request to transition a proposal review state. +#[derive(Clone, Debug, Deserialize)] +pub struct ConsolidationProposalReviewRequest { + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Agent performing the review transition. + pub reviewer_agent_id: String, + /// Proposal identifier. + pub proposal_id: Uuid, + /// Requested review state. + pub review_state: ConsolidationReviewState, + /// Optional reviewer comment. + pub review_comment: Option, +} + +/// Public consolidation proposal DTO. +#[derive(Clone, Debug, Serialize)] +pub struct ConsolidationProposalResponse { + /// Consolidation proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Agent that registered the proposal. + pub agent_id: String, + /// Versioned consolidation contract schema. + pub contract_schema: String, + /// Proposal kind, such as derived_note or knowledge_page. + pub proposal_kind: String, + /// Derived-output apply intent. + pub apply_intent: String, + /// Current review state. + pub review_state: String, + /// Serialized source references. + pub source_refs: Value, + /// Aggregate source snapshot metadata. + pub source_snapshot: Value, + /// Serialized proposal lineage. + pub lineage: Value, + /// Serialized reviewable diff. + pub diff: Value, + /// Proposal confidence score. + pub confidence: f32, + /// Serialized contradiction markers. + pub contradiction_markers: Value, + /// Serialized staleness markers. + pub staleness_markers: Value, + /// Serialized derived target reference. + pub target_ref: Value, + /// Serialized proposed derived output payload. + pub proposed_payload: Value, + /// Agent that last reviewed the proposal. + pub reviewer_agent_id: Option, + /// Optional reviewer comment. + pub review_comment: Option, + /// Timestamp of the last review transition. + pub reviewed_at: Option, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} +impl From for ConsolidationProposalResponse { + fn from(proposal: ConsolidationProposal) -> Self { + Self { + proposal_id: proposal.proposal_id, + run_id: proposal.run_id, + tenant_id: proposal.tenant_id, + project_id: proposal.project_id, + agent_id: proposal.agent_id, + contract_schema: proposal.contract_schema, + proposal_kind: proposal.proposal_kind, + apply_intent: proposal.apply_intent, + review_state: proposal.review_state, + source_refs: proposal.source_refs, + source_snapshot: proposal.source_snapshot, + lineage: proposal.lineage, + diff: proposal.diff, + confidence: proposal.confidence, + contradiction_markers: proposal.contradiction_markers, + staleness_markers: proposal.staleness_markers, + target_ref: proposal.target_ref, + proposed_payload: proposal.proposed_payload, + reviewer_agent_id: proposal.reviewer_agent_id, + review_comment: proposal.review_comment, + reviewed_at: proposal.reviewed_at, + created_at: proposal.created_at, + updated_at: proposal.updated_at, + } + } +} + +impl ElfService { + /// Creates a fixture-backed consolidation run and optional proposals. + pub async fn consolidation_run_create( + &self, + req: ConsolidationRunCreateRequest, + ) -> Result { + validate_context(req.tenant_id.as_str(), req.project_id.as_str(), req.agent_id.as_str())?; + validate_job_kind(req.job_kind.as_str())?; + + consolidation::validate_source_refs(&req.input_refs).map_err(validation_error)?; + + validate_object("source_snapshot", &req.source_snapshot)?; + + req.lineage.validate().map_err(validation_error)?; + + for proposal in &req.proposals { + proposal.validate()?; + } + + let has_proposals = !req.proposals.is_empty(); + let now = OffsetDateTime::now_utc(); + let run_state = if has_proposals { + ConsolidationRunState::Running + } else { + ConsolidationRunState::Pending + }; + let run_id = Uuid::new_v4(); + let mut run = ConsolidationRun { + run_id, + tenant_id: req.tenant_id.clone(), + project_id: req.project_id.clone(), + agent_id: req.agent_id.clone(), + contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), + job_kind: req.job_kind, + status: run_state.as_str().to_string(), + input_refs: to_value(&req.input_refs)?, + source_snapshot: req.source_snapshot, + lineage: to_value(&req.lineage)?, + error: empty_object(), + created_at: now, + updated_at: now, + completed_at: terminal_time(run_state, now), + }; + let mut proposals = Vec::with_capacity(req.proposals.len()); + let mut tx = self.db.pool.begin().await?; + + elf_storage::consolidation::insert_consolidation_run(&mut *tx, &run).await?; + + for input in req.proposals { + let proposal = proposal_row_from_input( + run_id, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.agent_id.as_str(), + now, + input, + )?; + + elf_storage::consolidation::insert_consolidation_proposal(&mut *tx, &proposal).await?; + + proposals.push(ConsolidationProposalResponse::from(proposal)); + } + + if has_proposals { + run_state + .validate_transition(ConsolidationRunState::Completed) + .map_err(validation_error)?; + + let terminal_error = empty_object(); + + run = elf_storage::consolidation::update_consolidation_run_state( + &mut *tx, + ConsolidationRunStateUpdate { + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + run_id, + status: ConsolidationRunState::Completed.as_str(), + error: &terminal_error, + now, + }, + ) + .await? + .ok_or_else(|| Error::NotFound { + message: "consolidation run not found".to_string(), + })?; + } + + tx.commit().await?; + + Ok(ConsolidationRunCreateResponse { run: ConsolidationRunResponse::from(run), proposals }) + } + + /// Fetches one consolidation run. + pub async fn consolidation_run_get( + &self, + req: ConsolidationRunGetRequest, + ) -> Result { + let run = elf_storage::consolidation::get_consolidation_run( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.run_id, + ) + .await? + .ok_or_else(|| Error::NotFound { message: "consolidation run not found".to_string() })?; + + Ok(ConsolidationRunResponse::from(run)) + } + + /// Lists consolidation runs. + pub async fn consolidation_runs_list( + &self, + req: ConsolidationRunsListRequest, + ) -> Result { + let limit = bounded_limit(req.limit); + let rows = elf_storage::consolidation::list_consolidation_runs( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + limit, + ) + .await?; + let runs = rows.into_iter().map(ConsolidationRunResponse::from).collect(); + + Ok(ConsolidationRunsListResponse { runs }) + } + + /// Fetches one consolidation proposal. + pub async fn consolidation_proposal_get( + &self, + req: ConsolidationProposalGetRequest, + ) -> Result { + let proposal = elf_storage::consolidation::get_consolidation_proposal( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.proposal_id, + ) + .await? + .ok_or_else(|| Error::NotFound { + message: "consolidation proposal not found".to_string(), + })?; + + Ok(ConsolidationProposalResponse::from(proposal)) + } + + /// Lists consolidation proposals. + pub async fn consolidation_proposals_list( + &self, + req: ConsolidationProposalsListRequest, + ) -> Result { + let limit = bounded_limit(req.limit); + let review_state = req.review_state.map(ConsolidationReviewState::as_str); + let rows = elf_storage::consolidation::list_consolidation_proposals( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.run_id, + review_state, + limit, + ) + .await?; + let proposals = rows.into_iter().map(ConsolidationProposalResponse::from).collect(); + + Ok(ConsolidationProposalsListResponse { proposals }) + } + + /// Applies one allowed proposal review-state transition. + pub async fn consolidation_proposal_review( + &self, + req: ConsolidationProposalReviewRequest, + ) -> Result { + validate_context( + req.tenant_id.as_str(), + req.project_id.as_str(), + req.reviewer_agent_id.as_str(), + )?; + + let existing = elf_storage::consolidation::get_consolidation_proposal( + &self.db.pool, + req.tenant_id.as_str(), + req.project_id.as_str(), + req.proposal_id, + ) + .await? + .ok_or_else(|| Error::NotFound { + message: "consolidation proposal not found".to_string(), + })?; + let current = + ConsolidationReviewState::parse(existing.review_state.as_str()).ok_or_else(|| { + Error::InvalidRequest { + message: "stored proposal review_state is invalid".to_string(), + } + })?; + + current.validate_transition(req.review_state).map_err(validation_error)?; + + let updated = elf_storage::consolidation::update_consolidation_proposal_review( + &self.db.pool, + ConsolidationProposalReviewUpdate { + tenant_id: req.tenant_id.as_str(), + project_id: req.project_id.as_str(), + proposal_id: req.proposal_id, + review_state: req.review_state.as_str(), + reviewer_agent_id: req.reviewer_agent_id.as_str(), + review_comment: req.review_comment.as_deref(), + now: OffsetDateTime::now_utc(), + }, + ) + .await? + .ok_or_else(|| Error::NotFound { + message: "consolidation proposal not found".to_string(), + })?; + + Ok(ConsolidationProposalResponse::from(updated)) + } +} + +fn proposal_row_from_input( + run_id: Uuid, + tenant_id: &str, + project_id: &str, + agent_id: &str, + now: OffsetDateTime, + input: ConsolidationProposalInput, +) -> Result { + Ok(ConsolidationProposal { + proposal_id: Uuid::new_v4(), + run_id, + tenant_id: tenant_id.to_string(), + project_id: project_id.to_string(), + agent_id: agent_id.to_string(), + contract_schema: CONSOLIDATION_CONTRACT_SCHEMA_V1.to_string(), + proposal_kind: input.proposal_kind, + apply_intent: input.apply_intent.as_str().to_string(), + review_state: ConsolidationReviewState::Proposed.as_str().to_string(), + source_refs: to_value(&input.source_refs)?, + source_snapshot: input.source_snapshot, + lineage: to_value(&input.lineage)?, + diff: to_value(&input.diff)?, + confidence: input.confidence, + contradiction_markers: to_value(&input.markers.contradictions)?, + staleness_markers: to_value(&input.markers.staleness)?, + target_ref: input.target_ref, + proposed_payload: input.proposed_payload, + reviewer_agent_id: None, + review_comment: None, + reviewed_at: None, + created_at: now, + updated_at: now, + }) +} + +fn validate_context(tenant_id: &str, project_id: &str, agent_id: &str) -> Result<()> { + validate_non_empty("tenant_id", tenant_id)?; + validate_non_empty("project_id", project_id)?; + + validate_non_empty("agent_id", agent_id) +} + +fn validate_job_kind(job_kind: &str) -> Result<()> { + validate_non_empty("job_kind", job_kind) +} + +fn validate_non_empty(field: &'static str, value: &str) -> Result<()> { + if value.trim().is_empty() { + return Err(Error::InvalidRequest { message: format!("{field} must not be empty.") }); + } + + Ok(()) +} + +fn validate_object(field: &str, value: &Value) -> Result<()> { + if matches!(value, Value::Object(_)) { + Ok(()) + } else { + Err(Error::InvalidRequest { message: format!("{field} must be a JSON object.") }) + } +} + +fn validation_error(err: ConsolidationValidationError) -> Error { + Error::InvalidRequest { message: err.to_string() } +} + +fn bounded_limit(limit: Option) -> i64 { + limit.map(i64::from).unwrap_or(DEFAULT_LIST_LIMIT).clamp(1, MAX_LIST_LIMIT) +} + +fn to_value(value: &T) -> Result +where + T: Serialize, +{ + serde_json::to_value(value).map_err(|err| Error::InvalidRequest { + message: format!("failed to serialize consolidation contract: {err}"), + }) +} + +fn empty_object() -> Value { + Value::Object(Map::new()) +} + +fn terminal_time(state: ConsolidationRunState, now: OffsetDateTime) -> Option { + match state { + ConsolidationRunState::Completed + | ConsolidationRunState::Failed + | ConsolidationRunState::Cancelled => Some(now), + ConsolidationRunState::Pending | ConsolidationRunState::Running => None, + } +} diff --git a/packages/elf-service/src/lib.rs b/packages/elf-service/src/lib.rs index 78c522c5..55f98c4d 100644 --- a/packages/elf-service/src/lib.rs +++ b/packages/elf-service/src/lib.rs @@ -6,6 +6,7 @@ pub mod add_event; pub mod add_note; pub mod admin; pub mod admin_graph_predicates; +pub mod consolidation; pub mod delete; pub mod docs; pub mod graph; @@ -37,6 +38,13 @@ pub use self::{ AdminGraphPredicatePatchRequest, AdminGraphPredicateResponse, AdminGraphPredicatesListRequest, AdminGraphPredicatesListResponse, }, + consolidation::{ + ConsolidationProposalGetRequest, ConsolidationProposalInput, ConsolidationProposalResponse, + ConsolidationProposalReviewRequest, ConsolidationProposalsListRequest, + ConsolidationProposalsListResponse, ConsolidationRunCreateRequest, + ConsolidationRunCreateResponse, ConsolidationRunGetRequest, ConsolidationRunResponse, + ConsolidationRunsListRequest, ConsolidationRunsListResponse, + }, delete::{DeleteRequest, DeleteResponse}, docs::{ DocType, DocsExcerptResponse, DocsExcerptsGetRequest, DocsGetRequest, DocsGetResponse, diff --git a/packages/elf-storage/src/consolidation.rs b/packages/elf-storage/src/consolidation.rs new file mode 100644 index 00000000..c8baeae6 --- /dev/null +++ b/packages/elf-storage/src/consolidation.rs @@ -0,0 +1,446 @@ +//! Consolidation run and proposal persistence queries. + +use serde_json::Value; +use sqlx::PgExecutor; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{ + Result, + models::{ConsolidationProposal, ConsolidationRun}, +}; + +const CONSOLIDATION_RUN_SELECT: &str = "\ +SELECT + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + COALESCE(error, '{}'::jsonb) AS error, + created_at, + updated_at, + completed_at +FROM consolidation_runs +WHERE tenant_id = $1 AND project_id = $2 AND run_id = $3 +LIMIT 1"; +const CONSOLIDATION_PROPOSAL_SELECT: &str = "\ +SELECT + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 AND project_id = $2 AND proposal_id = $3 +LIMIT 1"; + +/// Arguments for updating a consolidation run state. +pub struct ConsolidationRunStateUpdate<'a> { + /// Tenant that owns the run. + pub tenant_id: &'a str, + /// Project that owns the run. + pub project_id: &'a str, + /// Run identifier. + pub run_id: Uuid, + /// New run status. + pub status: &'a str, + /// Structured error payload for terminal failure states. + pub error: &'a Value, + /// Update timestamp. + pub now: OffsetDateTime, +} + +/// Arguments for updating a consolidation proposal review state. +pub struct ConsolidationProposalReviewUpdate<'a> { + /// Tenant that owns the proposal. + pub tenant_id: &'a str, + /// Project that owns the proposal. + pub project_id: &'a str, + /// Proposal identifier. + pub proposal_id: Uuid, + /// New review state. + pub review_state: &'a str, + /// Reviewing agent identifier. + pub reviewer_agent_id: &'a str, + /// Optional reviewer comment. + pub review_comment: Option<&'a str>, + /// Update timestamp. + pub now: OffsetDateTime, +} + +/// Inserts one consolidation run. +pub async fn insert_consolidation_run<'e, E>(executor: E, run: &ConsolidationRun) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO consolidation_runs ( + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + error, + created_at, + updated_at, + completed_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)", + ) + .bind(run.run_id) + .bind(run.tenant_id.as_str()) + .bind(run.project_id.as_str()) + .bind(run.agent_id.as_str()) + .bind(run.contract_schema.as_str()) + .bind(run.job_kind.as_str()) + .bind(run.status.as_str()) + .bind(&run.input_refs) + .bind(&run.source_snapshot) + .bind(&run.lineage) + .bind(&run.error) + .bind(run.created_at) + .bind(run.updated_at) + .bind(run.completed_at) + .execute(executor) + .await?; + + Ok(()) +} + +/// Fetches one consolidation run by tenant and run identifier. +pub async fn get_consolidation_run<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + run_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationRun>(CONSOLIDATION_RUN_SELECT) + .bind(tenant_id) + .bind(project_id) + .bind(run_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Lists consolidation runs for one tenant and project. +pub async fn list_consolidation_runs<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + limit: i64, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, ConsolidationRun>( + "\ +SELECT + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + COALESCE(error, '{}'::jsonb) AS error, + created_at, + updated_at, + completed_at +FROM consolidation_runs +WHERE tenant_id = $1 AND project_id = $2 +ORDER BY created_at DESC, run_id DESC +LIMIT $3", + ) + .bind(tenant_id) + .bind(project_id) + .bind(limit) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Updates one consolidation run state. +pub async fn update_consolidation_run_state<'e, E>( + executor: E, + args: ConsolidationRunStateUpdate<'_>, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationRun>( + "\ +UPDATE consolidation_runs +SET + status = $1, + error = $2, + updated_at = $3, + completed_at = CASE + WHEN $1 IN ('completed', 'failed', 'cancelled') THEN $3 + ELSE completed_at + END +WHERE tenant_id = $4 AND project_id = $5 AND run_id = $6 +RETURNING + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + job_kind, + status, + input_refs, + source_snapshot, + lineage, + COALESCE(error, '{}'::jsonb) AS error, + created_at, + updated_at, + completed_at", + ) + .bind(args.status) + .bind(args.error) + .bind(args.now) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.run_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Inserts one consolidation proposal. +pub async fn insert_consolidation_proposal<'e, E>( + executor: E, + proposal: &ConsolidationProposal, +) -> Result<()> +where + E: PgExecutor<'e>, +{ + sqlx::query( + "\ +INSERT INTO consolidation_proposals ( + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + contradiction_markers, + staleness_markers, + target_ref, + proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23)", + ) + .bind(proposal.proposal_id) + .bind(proposal.run_id) + .bind(proposal.tenant_id.as_str()) + .bind(proposal.project_id.as_str()) + .bind(proposal.agent_id.as_str()) + .bind(proposal.contract_schema.as_str()) + .bind(proposal.proposal_kind.as_str()) + .bind(proposal.apply_intent.as_str()) + .bind(proposal.review_state.as_str()) + .bind(&proposal.source_refs) + .bind(&proposal.source_snapshot) + .bind(&proposal.lineage) + .bind(&proposal.diff) + .bind(proposal.confidence) + .bind(&proposal.contradiction_markers) + .bind(&proposal.staleness_markers) + .bind(&proposal.target_ref) + .bind(&proposal.proposed_payload) + .bind(proposal.reviewer_agent_id.as_deref()) + .bind(proposal.review_comment.as_deref()) + .bind(proposal.reviewed_at) + .bind(proposal.created_at) + .bind(proposal.updated_at) + .execute(executor) + .await?; + + Ok(()) +} + +/// Fetches one consolidation proposal by tenant and proposal identifier. +pub async fn get_consolidation_proposal<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + proposal_id: Uuid, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>(CONSOLIDATION_PROPOSAL_SELECT) + .bind(tenant_id) + .bind(project_id) + .bind(proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} + +/// Lists consolidation proposals for one tenant and project. +pub async fn list_consolidation_proposals<'e, E>( + executor: E, + tenant_id: &str, + project_id: &str, + run_id: Option, + review_state: Option<&str>, + limit: i64, +) -> Result> +where + E: PgExecutor<'e>, +{ + let rows = sqlx::query_as::<_, ConsolidationProposal>( + "\ +SELECT + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at +FROM consolidation_proposals +WHERE tenant_id = $1 + AND project_id = $2 + AND ($3::uuid IS NULL OR run_id = $3) + AND ($4::text IS NULL OR review_state = $4) +ORDER BY created_at DESC, proposal_id DESC +LIMIT $5", + ) + .bind(tenant_id) + .bind(project_id) + .bind(run_id) + .bind(review_state) + .bind(limit) + .fetch_all(executor) + .await?; + + Ok(rows) +} + +/// Updates one proposal review state. +pub async fn update_consolidation_proposal_review<'e, E>( + executor: E, + args: ConsolidationProposalReviewUpdate<'_>, +) -> Result> +where + E: PgExecutor<'e>, +{ + let row = sqlx::query_as::<_, ConsolidationProposal>( + "\ +UPDATE consolidation_proposals +SET + review_state = $1, + reviewer_agent_id = $2, + review_comment = $3, + reviewed_at = $4, + updated_at = $4 +WHERE tenant_id = $5 AND project_id = $6 AND proposal_id = $7 +RETURNING + proposal_id, + run_id, + tenant_id, + project_id, + agent_id, + contract_schema, + proposal_kind, + apply_intent, + review_state, + source_refs, + source_snapshot, + lineage, + diff, + confidence, + COALESCE(contradiction_markers, '[]'::jsonb) AS contradiction_markers, + COALESCE(staleness_markers, '[]'::jsonb) AS staleness_markers, + COALESCE(target_ref, '{}'::jsonb) AS target_ref, + COALESCE(proposed_payload, '{}'::jsonb) AS proposed_payload, + reviewer_agent_id, + review_comment, + reviewed_at, + created_at, + updated_at", + ) + .bind(args.review_state) + .bind(args.reviewer_agent_id) + .bind(args.review_comment) + .bind(args.now) + .bind(args.tenant_id) + .bind(args.project_id) + .bind(args.proposal_id) + .fetch_optional(executor) + .await?; + + Ok(row) +} diff --git a/packages/elf-storage/src/lib.rs b/packages/elf-storage/src/lib.rs index dae9e60b..91c3d369 100644 --- a/packages/elf-storage/src/lib.rs +++ b/packages/elf-storage/src/lib.rs @@ -2,6 +2,7 @@ //! Storage adapters and row models for ELF persistence backends. +pub mod consolidation; pub mod db; pub mod doc_outbox; pub mod docs; diff --git a/packages/elf-storage/src/models.rs b/packages/elf-storage/src/models.rs index f8bec3f9..baf9afb8 100644 --- a/packages/elf-storage/src/models.rs +++ b/packages/elf-storage/src/models.rs @@ -280,6 +280,90 @@ pub struct GraphFactSupersession { pub created_at: OffsetDateTime, } +/// Persisted consolidation run row. +#[derive(Debug, FromRow)] +pub struct ConsolidationRun { + /// Consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the run. + pub tenant_id: String, + /// Project that owns the run. + pub project_id: String, + /// Agent that registered the run. + pub agent_id: String, + /// Versioned consolidation contract schema. + pub contract_schema: String, + /// Job kind, such as fixture, manual, or scheduled. + pub job_kind: String, + /// Current run status. + pub status: String, + /// Serialized input references. + pub input_refs: Value, + /// Aggregate source snapshot metadata. + pub source_snapshot: Value, + /// Serialized run lineage. + pub lineage: Value, + /// Structured error payload for failed runs. + pub error: Value, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, + /// Completion timestamp for terminal runs. + pub completed_at: Option, +} + +/// Persisted consolidation proposal row. +#[derive(Debug, FromRow)] +pub struct ConsolidationProposal { + /// Consolidation proposal identifier. + pub proposal_id: Uuid, + /// Parent consolidation run identifier. + pub run_id: Uuid, + /// Tenant that owns the proposal. + pub tenant_id: String, + /// Project that owns the proposal. + pub project_id: String, + /// Agent that registered the proposal. + pub agent_id: String, + /// Versioned consolidation contract schema. + pub contract_schema: String, + /// Proposal kind, such as derived_note or knowledge_page. + pub proposal_kind: String, + /// Derived-output apply intent. + pub apply_intent: String, + /// Current review state. + pub review_state: String, + /// Serialized source references. + pub source_refs: Value, + /// Aggregate source snapshot metadata. + pub source_snapshot: Value, + /// Serialized proposal lineage. + pub lineage: Value, + /// Serialized reviewable diff. + pub diff: Value, + /// Proposal confidence score. + pub confidence: f32, + /// Serialized contradiction markers. + pub contradiction_markers: Value, + /// Serialized staleness markers. + pub staleness_markers: Value, + /// Serialized derived target reference. + pub target_ref: Value, + /// Serialized proposed derived output payload. + pub proposed_payload: Value, + /// Agent that last reviewed the proposal. + pub reviewer_agent_id: Option, + /// Optional reviewer comment. + pub review_comment: Option, + /// Timestamp of the last review transition. + pub reviewed_at: Option, + /// Creation timestamp. + pub created_at: OffsetDateTime, + /// Last update timestamp. + pub updated_at: OffsetDateTime, +} + /// Persisted document row. #[derive(Debug, FromRow)] pub struct DocDocument { diff --git a/packages/elf-storage/src/schema.rs b/packages/elf-storage/src/schema.rs index c8a5db3d..4b7e29fd 100644 --- a/packages/elf-storage/src/schema.rs +++ b/packages/elf-storage/src/schema.rs @@ -75,6 +75,10 @@ fn expand_includes(sql: &str) -> String { "tables/030_memory_ingestion_profile_defaults.sql" => out.push_str(include_str!( "../../../sql/tables/030_memory_ingestion_profile_defaults.sql" )), + "tables/031_consolidation_runs.sql" => + out.push_str(include_str!("../../../sql/tables/031_consolidation_runs.sql")), + "tables/032_consolidation_proposals.sql" => out + .push_str(include_str!("../../../sql/tables/032_consolidation_proposals.sql")), "tables/023_memory_ingest_decisions.sql" => out .push_str(include_str!("../../../sql/tables/023_memory_ingest_decisions.sql")), "tables/024_memory_space_grants.sql" => diff --git a/packages/elf-storage/tests/db_smoke.rs b/packages/elf-storage/tests/db_smoke.rs index 47b99b1d..07577e9c 100644 --- a/packages/elf-storage/tests/db_smoke.rs +++ b/packages/elf-storage/tests/db_smoke.rs @@ -43,6 +43,24 @@ fn chunk_tables_exist_after_bootstrap() { assert_eq!(count, 1); + let count: i64 = sqlx::query_scalar( + "SELECT count(*) FROM information_schema.tables WHERE table_name = 'consolidation_runs'", + ) + .fetch_one(&db.pool) + .await + .expect("Failed to query schema tables."); + + assert_eq!(count, 1); + + let count: i64 = sqlx::query_scalar( + "SELECT count(*) FROM information_schema.tables WHERE table_name = 'consolidation_proposals'", + ) + .fetch_one(&db.pool) + .await + .expect("Failed to query schema tables."); + + assert_eq!(count, 1); + let count: i64 = sqlx::query_scalar( "SELECT count(*) FROM information_schema.tables WHERE table_name = 'memory_space_grants'", ) diff --git a/sql/init.sql b/sql/init.sql index 1795f167..780778f4 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -29,3 +29,5 @@ \ir tables/028_doc_indexing_outbox.sql \ir tables/029_memory_ingestion_profiles.sql \ir tables/030_memory_ingestion_profile_defaults.sql +\ir tables/031_consolidation_runs.sql +\ir tables/032_consolidation_proposals.sql diff --git a/sql/tables/031_consolidation_runs.sql b/sql/tables/031_consolidation_runs.sql new file mode 100644 index 00000000..ca7504d2 --- /dev/null +++ b/sql/tables/031_consolidation_runs.sql @@ -0,0 +1,52 @@ +CREATE TABLE IF NOT EXISTS consolidation_runs ( + run_id uuid PRIMARY KEY, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + contract_schema text NOT NULL, + job_kind text NOT NULL, + status text NOT NULL, + input_refs jsonb NOT NULL, + source_snapshot jsonb NOT NULL, + lineage jsonb NOT NULL, + error jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + completed_at timestamptz NULL +); + +ALTER TABLE consolidation_runs + DROP CONSTRAINT IF EXISTS ck_consolidation_runs_status; +ALTER TABLE consolidation_runs + ADD CONSTRAINT ck_consolidation_runs_status + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')); + +ALTER TABLE consolidation_runs + DROP CONSTRAINT IF EXISTS ck_consolidation_runs_input_refs; +ALTER TABLE consolidation_runs + ADD CONSTRAINT ck_consolidation_runs_input_refs + CHECK (jsonb_typeof(input_refs) = 'array'); + +ALTER TABLE consolidation_runs + DROP CONSTRAINT IF EXISTS ck_consolidation_runs_source_snapshot; +ALTER TABLE consolidation_runs + ADD CONSTRAINT ck_consolidation_runs_source_snapshot + CHECK (jsonb_typeof(source_snapshot) = 'object'); + +ALTER TABLE consolidation_runs + DROP CONSTRAINT IF EXISTS ck_consolidation_runs_lineage; +ALTER TABLE consolidation_runs + ADD CONSTRAINT ck_consolidation_runs_lineage + CHECK (jsonb_typeof(lineage) = 'object'); + +ALTER TABLE consolidation_runs + DROP CONSTRAINT IF EXISTS ck_consolidation_runs_error; +ALTER TABLE consolidation_runs + ADD CONSTRAINT ck_consolidation_runs_error + CHECK (jsonb_typeof(error) = 'object'); + +CREATE INDEX IF NOT EXISTS idx_consolidation_runs_context_created + ON consolidation_runs (tenant_id, project_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_consolidation_runs_status_updated + ON consolidation_runs (tenant_id, project_id, status, updated_at DESC); diff --git a/sql/tables/032_consolidation_proposals.sql b/sql/tables/032_consolidation_proposals.sql new file mode 100644 index 00000000..3b3addc5 --- /dev/null +++ b/sql/tables/032_consolidation_proposals.sql @@ -0,0 +1,106 @@ +CREATE TABLE IF NOT EXISTS consolidation_proposals ( + proposal_id uuid PRIMARY KEY, + run_id uuid NOT NULL REFERENCES consolidation_runs(run_id) ON DELETE CASCADE, + tenant_id text NOT NULL, + project_id text NOT NULL, + agent_id text NOT NULL, + contract_schema text NOT NULL, + proposal_kind text NOT NULL, + apply_intent text NOT NULL, + review_state text NOT NULL, + source_refs jsonb NOT NULL, + source_snapshot jsonb NOT NULL, + lineage jsonb NOT NULL, + diff jsonb NOT NULL, + confidence real NOT NULL, + contradiction_markers jsonb NOT NULL DEFAULT '[]'::jsonb, + staleness_markers jsonb NOT NULL DEFAULT '[]'::jsonb, + target_ref jsonb NOT NULL DEFAULT '{}'::jsonb, + proposed_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + reviewer_agent_id text NULL, + review_comment text NULL, + reviewed_at timestamptz NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_apply_intent; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_apply_intent + CHECK ( + apply_intent IN ( + 'create_derived_note', + 'update_derived_note', + 'create_derived_knowledge_page', + 'update_derived_knowledge_page', + 'create_derived_graph_view', + 'no_op' + ) + ); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_review_state; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_review_state + CHECK (review_state IN ('proposed', 'approved', 'rejected', 'applied', 'archived')); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_source_refs; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_source_refs + CHECK (jsonb_typeof(source_refs) = 'array'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_source_snapshot; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_source_snapshot + CHECK (jsonb_typeof(source_snapshot) = 'object'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_lineage; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_lineage + CHECK (jsonb_typeof(lineage) = 'object'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_diff; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_diff + CHECK (jsonb_typeof(diff) = 'object'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_confidence; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_confidence + CHECK (confidence >= 0.0 AND confidence <= 1.0); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_contradiction_markers; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_contradiction_markers + CHECK (jsonb_typeof(contradiction_markers) = 'array'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_staleness_markers; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_staleness_markers + CHECK (jsonb_typeof(staleness_markers) = 'array'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_target_ref; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_target_ref + CHECK (jsonb_typeof(target_ref) = 'object'); + +ALTER TABLE consolidation_proposals + DROP CONSTRAINT IF EXISTS ck_consolidation_proposals_proposed_payload; +ALTER TABLE consolidation_proposals + ADD CONSTRAINT ck_consolidation_proposals_proposed_payload + CHECK (jsonb_typeof(proposed_payload) = 'object'); + +CREATE INDEX IF NOT EXISTS idx_consolidation_proposals_run_created + ON consolidation_proposals (run_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_consolidation_proposals_context_state_created + ON consolidation_proposals (tenant_id, project_id, review_state, created_at DESC);