diff --git a/CHANGELOG.md b/CHANGELOG.md index d1de82b..aeb264a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ This format follows [Keep a Changelog](https://keepachangelog.com/) and adheres ## [Unreleased] +### Added +- **Retrieval telemetry can now be imported as evaluation datasets.** The new + `telemetry_imports` config contract and `agentops telemetry validate`, + `agentops telemetry preview`, and `agentops telemetry import` commands let + teams turn reviewed retrieval telemetry into dataset-backed eval rows with + `response_source: dataset`. Grey-box HTTP agents can map `response_fields` from + `$response.context`, and the evaluation docs now cover the import workflow and + contract. + ### Changed - **PR-stage Foundry prompt-agent versions are now tagged at the source.** When `agentops.pipeline.prompt_deploy stage` runs in a PR context (GitHub Actions diff --git a/README.md b/README.md index df83e83..e7428c8 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,10 @@ Doctor/Cockpit, and `[mcp]` for MCP. - `agentops eval analyze` - check eval readiness. - `agentops eval init` - bootstrap an azd `eval.yaml` recipe and wire `execution: azd`. - `agentops eval run [--baseline PATH]` - run an evaluation. -- `agentops eval promote-traces --source FILE [--apply]` - promote traces. +- `agentops eval promote-traces --source FILE [--apply]` - promote local trace export files. +- `agentops telemetry validate NAME` - validate an Azure Monitor or Application Insights import. +- `agentops telemetry preview NAME --rows N` - preview telemetry import rows. +- `agentops telemetry import NAME --apply` - write the imported telemetry dataset. - `agentops report generate` - regenerate `report.md`. - `agentops workflow analyze` - recommend CI/CD shape. - `agentops workflow generate` - generate CI/CD workflows. @@ -217,6 +220,7 @@ Cockpit sections, in display order: - [Foundry Prompt Agent tutorial](docs/tutorial-prompt-agent-quickstart.md) - use this when the Foundry target is `agent: name:version`. Walks the sandbox → dev journey with a PR gate. - [Hosted or HTTP Agent tutorial](docs/tutorial-hosted-agent-quickstart.md) - use this when the target is a Foundry hosted or HTTP endpoint URL. Same sandbox → dev journey for endpoint-based agents. - [End-to-end tutorial](docs/tutorial-end-to-end.md) - extends either of the above with the full sandbox → dev → qa → prod promotion, Foundry red-team scans, and trace-to-regression promotion. +- [Evaluation paths](docs/evaluation.md) - choose static dataset, grey-box HTTP, or telemetry/trace import. - [Core concepts](docs/concepts.md) - [How it works](docs/how-it-works.md) - [Doctor explained](docs/doctor-explained.md) diff --git a/docs/concepts.md b/docs/concepts.md index 93be6d3..39ce4ef 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -96,8 +96,10 @@ Common `agent:` values: | `"model:gpt-4o-mini"` | Direct model deployment | HTTP targets can add top-level mapping fields such as `request_field`, -`response_field`, `tool_calls_field`, `auth_header_env`, and -`extra_fields`. +`response_fields`, `tool_calls_field`, `auth_header_env`, and `extra_fields`. +Use `response_fields.response` for the final answer and +`response_fields.context` for retrieved context. Use `response_source: dataset` +when each dataset row already contains the response to evaluate. ### Dataset @@ -198,6 +200,8 @@ AgentOps auto-selects common evaluation patterns from the dataset: Use one of the three hands-on tutorials for scenario coverage: +- [Evaluation paths](evaluation.md) explains when to use a static dataset, + grey-box HTTP response mapping, or telemetry/trace import. - [Foundry Prompt Agent tutorial](tutorial-prompt-agent-quickstart.md) for Foundry prompt agents referenced as `name:version`. - [Hosted or HTTP Agent tutorial](tutorial-hosted-agent-quickstart.md) for Foundry @@ -215,9 +219,13 @@ the fields your target needs: version: 1 agent: "https://api.example.com/chat" dataset: .agentops/data/support.jsonl +response_source: agent +protocol: http-json request_field: message -response_field: text +response_fields: + response: text + context: retrieved_context thresholds: coherence: ">=3" diff --git a/docs/evaluation.md b/docs/evaluation.md new file mode 100644 index 0000000..aedaee6 --- /dev/null +++ b/docs/evaluation.md @@ -0,0 +1,257 @@ +# Evaluation + +Use this page when you need to choose how AgentOps should evaluate a RAG or +agent workflow. The goal is simple: pick the path that matches where your +evidence comes from, run the evaluation, and keep the result in a format that +reviewers can trust. + +AgentOps supports three evaluation paths: + +1. **Static dataset**: use a JSONL file that already contains the prompt, + expected answer, and optional retrieval context. +2. **Grey-box HTTP**: call an HTTP endpoint and extract both the answer and + retrieval context from the live response. +3. **Telemetry/trace import**: import production traces into a reviewable + dataset so real traffic can become future regression coverage. + +## Choose a path + +| Path | Use it when | Best first step | +|---|---|---| +| Static dataset | You already know the test cases, expected answers, and optionally the target responses. | Create or edit `.agentops/data/*.jsonl`. | +| Grey-box HTTP | Your endpoint can return the answer plus retrieval details for the same request. | Configure `request_field` and `response_fields`. | +| Telemetry/trace import | You want to learn from production traffic before adding new regression rows. | Configure `telemetry_imports`, then run `agentops telemetry preview`. | + +The paths build on each other. Most teams start with a static dataset, add +grey-box HTTP when they need retrieval telemetry, then use telemetry import after +the agent is running in production. + +```mermaid +flowchart LR + Static[Static dataset] --> HTTP[Grey-box HTTP] + HTTP --> Traces[Telemetry import] + Traces --> Static +``` + +## Static dataset + +Choose this path when the data you need is already in the dataset file. Each row +is a test case. AgentOps sends `input` to the target, compares the target +response with `expected`, and uses `context` when present to select RAG +evaluators. + +By default, `response_source: agent` means AgentOps calls the configured target. +Use `response_source: dataset` only when the dataset already includes the answer +you want to evaluate in a `response`, `prediction`, `output`, or `answer` field. +That is useful for offline review or imported trace rows that should not call a +live endpoint again. + +Minimal RAG row: + +```json +{"id":"refund-001","input":"What is the refund window?","expected":"Customers can request a refund within 30 days.","context":"Refunds are available for 30 days after purchase."} +``` + +Minimal config: + +```yaml +version: 1 +agent: "support-agent:3" +dataset: .agentops/data/rag-smoke.jsonl +response_source: agent + +thresholds: + groundedness: ">=3" + retrieval: ">=3" + response_completeness: ">=3" +``` + +Run it: + +```powershell +agentops eval analyze +agentops eval run +``` + +Use this path for: + +- Fast local checks before opening a PR. +- CI gates with stable examples. +- Baseline comparison with `agentops eval run --baseline`. +- Manual review of newly written or newly labeled examples. + +## Grey-box HTTP + +Choose this path when the endpoint can return more than final text. This is the +best path for RAG services because the evaluator can see what the agent actually +retrieved for the request. + +The endpoint response should include: + +- the final answer; +- retrieval context, citations, or document chunks; +- optional tool calls or workflow metadata. + +Example endpoint response: + +```json +{ + "answer": "Customers can request a refund within 30 days.", + "context": [ + "Refunds are available for 30 days after purchase.", + "Refunds require the original order number." + ], + "citations": ["refund-policy.md"] +} +``` + +Example config: + +```yaml +version: 1 +agent: "https://support-dev.example.com/chat" +dataset: .agentops/data/rag-smoke.jsonl + +protocol: http-json +request_field: message +response_fields: + response: answer + context: context + citations: citations + +thresholds: + groundedness: ">=3" + retrieval: ">=3" + relevance: ">=3" +``` + +What happens: + +1. AgentOps reads each row from the dataset. +2. It sends `row.input` as the HTTP request field named by `request_field`. +3. It extracts the final answer from `response_fields.response`. +4. It extracts retrieval context from `response_fields.context`. +5. RAG evaluators can use the extracted context through `$response.context`, + `$retrieved_context`, or `$retrieved_context_items`. + +Use dot paths when fields are nested: + +```yaml +response_fields: + response: output.text + context: output.retrieval.chunks +``` + +Use this path for: + +- RAG services where the retrieved chunks matter. +- Debugging why a groundedness or retrieval score changed. +- Endpoint-based agents hosted in Azure Container Apps, AKS, Foundry Hosted + Agents, or another HTTP host. + +## Telemetry import + +Choose this path when production traffic has useful examples that are not yet in +your test set. Telemetry import does not make production responses automatically +correct. It creates reviewable dataset candidates. + +Configure a named telemetry import in `agentops.yaml`: + +```yaml +telemetry_imports: + - name: prod-rag + target: application-insights + resource_id: $APPINSIGHTS_RESOURCE_ID + time_range: + lookback_days: 7 + filters: + customDimensions.agent: support-agent + fields: + input: customDimensions.question + response: customDimensions.answer + context: customDimensions.retrieved_context + trace_id: operation_Id + output: + path: .agentops/data/prod-rag-candidates.jsonl + label_mode: pending +``` + +Validate the import without querying Azure: + +```powershell +agentops telemetry validate prod-rag +``` + +Preview rows from Azure Monitor: + +```powershell +agentops telemetry preview prod-rag --rows 10 +``` + +Write the candidate dataset and manifest: + +```powershell +agentops telemetry import prod-rag --apply +``` + +Label modes: + +| Mode | What it writes | Use it when | +|---|---|---| +| `pending` | Empty `expected` values with review metadata. | A human must write the correct answer before the row can gate a release. | +| `self-similarity` | The production response becomes `expected`. | You want drift detection against known production behavior. | + +Telemetry import keeps lineage metadata such as trace ID, timestamp, replay URL, +and source system when those values exist in the export. If the trace includes +retrieval context, AgentOps writes it as `context` so RAG evaluators can use it +later. Evaluator mappings can also use `$telemetry.trace_id` when a trace ID is +needed for reporting or troubleshooting. + +If you already have a local trace export file, `agentops eval promote-traces` +still works. Use `agentops telemetry` when the source is Azure Monitor or +Application Insights. + +Use this path for: + +- Turning incidents or surprising production answers into regression tests. +- Sampling real traffic for future review. +- Building a trace-to-dataset flywheel without skipping human judgment. + +## Input mapping + +Evaluator inputs come from three places: + +| Source | Placeholder | Example | +|---|---|---| +| Dataset prompt | `$row.input` or `$prompt` | User question sent to the agent. | +| Dataset expected answer | `$row.expected` or `$expected` | Ground truth or acceptance criteria. | +| Agent response | `$response.response` or `$prediction` | Final answer returned by the target. | +| Any response field | `$response.` | Any field extracted through `response_fields`. | +| Extracted retrieval context | `$response.context`, `$retrieved_context`, or `$retrieved_context_items` | Chunks, citations, or grounding text from the live response. | +| Dataset retrieval context | `$row.context` | Static context stored in JSONL. | +| Trace ID | `$telemetry.trace_id` | Azure Monitor or Application Insights operation ID. | + +For beginners, the easiest rule is: + +- Put known test data in the dataset. +- Put live endpoint outputs under `response_fields`. +- Let AgentOps map the common fields to evaluators. + +Only customize evaluator selection when the automatic choice is not enough: + +```yaml +evaluators: + - GroundednessEvaluator + - RetrievalEvaluator + - RelevanceEvaluator +``` + +## Safety notes + +- Do not treat production responses as ground truth without review. +- Do not import sensitive trace payloads into a repository dataset. +- Keep secrets in environment variables or `.agentops/.env`, not in JSONL files. +- Prefer `--label-mode pending` when correctness matters. +- Use `self-similarity` only for drift detection. +- Keep trace replay links in metadata so reviewers can investigate the original + runtime behavior. diff --git a/docs/foundry-evaluation-sdk-built-in-evaluators.md b/docs/foundry-evaluation-sdk-built-in-evaluators.md index 87cfe85..e6cba1a 100644 --- a/docs/foundry-evaluation-sdk-built-in-evaluators.md +++ b/docs/foundry-evaluation-sdk-built-in-evaluators.md @@ -1,218 +1,199 @@ -# Foundry Evaluation SDK Built-in Evaluators (AgentOps) +# Foundry Evaluators -This guide maps Microsoft Foundry built-in evaluators to the configuration model used by AgentOps Toolkit. +This page explains how AgentOps maps Microsoft Foundry Evaluation SDK +evaluators to the data in `agentops.yaml`, dataset rows, HTTP responses, and +trace imports. -## 1) AgentOps config shape (quick reference) +Most users do not need to configure evaluator internals. AgentOps selects common +evaluators from the target type and dataset shape. Use this page when you need +to understand what each evaluator receives. -In AgentOps, each evaluator is configured under `bundle.evaluators[]`: +## Config shape + +The normal config stays small: ```yaml -evaluators: - - name: SimilarityEvaluator - source: foundry - enabled: true - config: - kind: builtin # builtin | custom - class_name: SimilarityEvaluator - init: # constructor kwargs - model_config: - azure_endpoint: ${env:AZURE_OPENAI_ENDPOINT} - azure_deployment: ${env:AZURE_OPENAI_DEPLOYMENT} - input_mapping: # evaluator call kwargs - query: $prompt - response: $prediction - ground_truth: $expected - score_keys: # ordered keys to read numeric score - - similarity - - score +version: 1 +agent: "https://support-dev.example.com/chat" +dataset: .agentops/data/rag-smoke.jsonl +response_source: agent + +protocol: http-json +request_field: message +response_fields: + response: answer + context: context + +thresholds: + groundedness: ">=3" + retrieval: ">=3" + coherence: ">=3" ``` -## 2) Global requirements by evaluator family - -- AI-assisted quality evaluators use a judge model (`model_config`) in Azure OpenAI/OpenAI schema. -- Risk/safety evaluators and `GroundednessProEvaluator` use `azure_ai_project` instead of GPT deployment in `model_config`. -- Agent evaluators require agent-style payloads (`query/response` as messages, and often tool metadata). -- NLP evaluators (`F1`, `BLEU`, `GLEU`, `ROUGE`, `METEOR`) are non-LLM evaluators and usually need `response` + `ground_truth`. - -## 3) Built-in evaluators and required AgentOps parameters - -| Evaluator | Category | Typical required inputs | Backend init requirements | AgentOps `config` minimum | -|---|---|---|---|---| -| `CoherenceEvaluator` | General purpose | `query`, `response` | `model_config` (AI-assisted) | `kind: builtin`, `class_name`, `input_mapping(query,response)`, `score_keys` | -| `FluencyEvaluator` | General purpose | `query`, `response` | `model_config` (AI-assisted) | same as above | -| `SimilarityEvaluator` | Textual similarity | `query`, `response`, `ground_truth` | `model_config` (AI-assisted) | `input_mapping(query,response,ground_truth)` | -| `F1ScoreEvaluator` | Textual similarity (NLP) | `response`, `ground_truth` | none beyond class init defaults | `input_mapping(response,ground_truth)` | -| `BleuScoreEvaluator` | Textual similarity (NLP) | `response`, `ground_truth` | none beyond class init defaults | `input_mapping(response,ground_truth)` | -| `GleuScoreEvaluator` | Textual similarity (NLP) | `response`, `ground_truth` | none beyond class init defaults | `input_mapping(response,ground_truth)` | -| `RougeScoreEvaluator` | Textual similarity (NLP) | `response`, `ground_truth` | optional `rouge_type` in `init` | `input_mapping(response,ground_truth)` | -| `MeteorScoreEvaluator` | Textual similarity (NLP) | `response`, `ground_truth` | optional `alpha/beta/gamma/threshold` in `init` | `input_mapping(response,ground_truth)` | -| `RetrievalEvaluator` | RAG | usually `query`, `response`, `context` | `model_config` (AI-assisted) | `input_mapping(query,response,context)` | -| `DocumentRetrievalEvaluator` | RAG | retrieval outputs + `ground_truth` | check SDK class contract | explicit `input_mapping` recommended | -| `GroundednessEvaluator` | RAG | `response`, `context` (optional `query`) | `model_config` (AI-assisted) | `input_mapping(response,context[,query])` | -| `GroundednessProEvaluator` *(preview)* | RAG | `query/response/context` or conversation | `azure_ai_project` (not GPT model_config) | `init.azure_ai_project`, explicit `input_mapping` | -| `RelevanceEvaluator` | RAG | `query`, `response` (optionally `context`) | `model_config` (AI-assisted) | `input_mapping(query,response[,context])` | -| `ResponseCompletenessEvaluator` | RAG | `response`, `ground_truth` (often query optional) | AI-assisted quality path | explicit `input_mapping`, `score_keys` | -| `HateUnfairnessEvaluator` | Safety | `query`, `response`/conversation | `azure_ai_project` | `init.azure_ai_project`, `input_mapping` | -| `SexualEvaluator` | Safety | `query`, `response`/conversation | `azure_ai_project` | same | -| `ViolenceEvaluator` | Safety | `query`, `response`/conversation | `azure_ai_project` | same | -| `SelfHarmEvaluator` | Safety | `query`, `response`/conversation | `azure_ai_project` | same | -| `ContentSafetyEvaluator` | Safety composite | query/response or conversation | `azure_ai_project` | `init.azure_ai_project`, `input_mapping` | -| `ProtectedMaterialEvaluator` | Safety | query/response or multimodal | `azure_ai_project` | `init.azure_ai_project`, `input_mapping` | -| `CodeVulnerabilityEvaluator` | Safety/risk | text/code response | `azure_ai_project` | `init.azure_ai_project`, `input_mapping(response[,query])` | -| `UngroundedAttributesEvaluator` | Safety/risk | text response | `azure_ai_project` | `init.azure_ai_project`, `input_mapping(response[,query])` | -| `IndirectAttackEvaluator` | Safety/risk | conversation-oriented input | `azure_ai_project` | `init.azure_ai_project`, `input_mapping(conversation/query,response)` | -| `IntentResolutionEvaluator` *(preview)* | Agent | `query`, `response` (string or message list) | agent evaluator path | `input_mapping(query,response[,tool_definitions])` | -| `TaskAdherenceEvaluator` *(preview)* | Agent | `query`, `response` (string or message list) | agent evaluator path | `input_mapping(query,response[,tool_calls])` | -| `ToolCallAccuracyEvaluator` *(preview)* | Agent | `query`; plus `response` or `tool_calls`; `tool_definitions` required | agent evaluator path | `input_mapping(query,response,tool_calls,tool_definitions)` | -| `TaskCompletionEvaluator` *(preview)* | Agent | agent run/conversation payload | preview; use latest SDK docs | explicit `input_mapping`, explicit `score_keys` | -| `TaskNavigationEfficiencyEvaluator` *(preview)* | Agent | tool/call sequence + expected path context | preview; evolving | explicit `input_mapping`, explicit `score_keys` | -| `ToolSelectionEvaluator` *(preview)* | Agent | query/response + selected tools + tool defs | preview; evolving | explicit `input_mapping`, explicit `score_keys` | -| `ToolInputAccuracyEvaluator` *(preview)* | Agent | tool args + tool defs + context | preview; evolving | explicit `input_mapping`, explicit `score_keys` | -| `ToolOutputUtilizationEvaluator` *(preview)* | Agent | tool outputs + final response | preview; evolving | explicit `input_mapping`, explicit `score_keys` | -| `ToolCallSuccessEvaluator` *(preview)* | Agent | tool execution results/status | preview; evolving | explicit `input_mapping`, explicit `score_keys` | -| `QAEvaluator` | Composite quality | `query`, `response`, `ground_truth`, `context` | `model_config` (AI-assisted composite) | `input_mapping(query,response,ground_truth,context)` | -| `AzureOpenAILabelGrader` | Azure OpenAI grader | template-driven (often conversation/query/response) | grader init requires template/model config | explicit `init` + explicit `input_mapping` | -| `AzureOpenAIStringCheckGrader` | Azure OpenAI grader | template-driven text fields | grader init requires template | explicit `init` + explicit `input_mapping` | -| `AzureOpenAITextSimilarityGrader` | Azure OpenAI grader | text + `ground_truth` equivalent | grader init requires template/model config | explicit `init` + explicit `input_mapping` | -| `AzureOpenAIGrader` | Azure OpenAI grader | template-defined | grader init requires rubric/template | explicit `init` + explicit `input_mapping` | - -## 4) Practical rules for AgentOps bundles - -- Always set `source: foundry` for Foundry SDK evaluators. -- For preview evaluators, always provide explicit: - - `config.class_name` - - `config.input_mapping` - - `config.score_keys` -- Prefer explicit `input_mapping` even when defaults might work. -- Keep `thresholds[].evaluator` exactly equal to `evaluators[].name`. -- For agent evaluators, use structured fields in dataset rows (messages, tool calls, tool definitions) and map with `$row.`. - -## 5) Examples by evaluator type - -The following examples show one practical bundle snippet for each evaluator family used in AgentOps: - -- `5.1` AI-assisted quality evaluators (`model_config`) -- `5.2` Risk/safety evaluators (`azure_ai_project`) -- `5.3` Agent evaluators (message/tool payloads) -- `5.4` NLP evaluators (non-LLM) - -## 5.1) Example for AI-assisted quality evaluator (`model_config`) +Use `evaluators:` only when you want to override the automatic choice: ```yaml evaluators: - - name: RelevanceEvaluator - source: foundry - enabled: true - config: - kind: builtin - class_name: RelevanceEvaluator - init: - model_config: - azure_endpoint: ${env:AZURE_OPENAI_ENDPOINT} - azure_deployment: ${env:AZURE_OPENAI_DEPLOYMENT} - input_mapping: - query: $prompt - response: $prediction - score_keys: - - relevance - - score + - GroundednessEvaluator + - RetrievalEvaluator + - RelevanceEvaluator +``` -thresholds: - - evaluator: RelevanceEvaluator - criteria: ">=" - value: 3 +## Evaluator families + +| Family | What it checks | Common inputs | +|---|---|---| +| Quality judges | The answer is coherent, fluent, similar, complete, or relevant. | prompt, response, expected answer | +| RAG judges | The answer uses retrieved context and the retrieval is useful. | prompt, response, context | +| Safety judges | The answer avoids harmful or protected content. | prompt, response | +| Agent judges | Tool use and agent workflow behavior are correct. | prompt, response, tool calls, tool definitions | +| Local metrics | Scores that do not need a judge model. | response, expected answer, latency | + +## Evaluator inputs + +AgentOps uses a small set of logical inputs. The same logical input can come from +a static dataset, a live HTTP response, or imported telemetry. + +| Logical input | Meaning | Common source | +|---|---|---| +| `query` | The user prompt. | `row.input` | +| `response` | The target's final answer. | extracted response text | +| `ground_truth` | The expected answer or acceptance criteria. | `row.expected` | +| `response field` | Any value extracted through `response_fields`. | `$response.` | +| `context` | Retrieved chunks, citations, or grounding text. | `row.context`, `$response.context`, `$retrieved_context`, or `$retrieved_context_items` | +| `tool_calls` | Tools called by the agent. | endpoint response or dataset row | +| `tool_definitions` | Tool schemas available to the agent. | dataset row | +| `trace_id` | Trace lineage for review and troubleshooting. | `$telemetry.trace_id` | + +## Mapping rules + +The mapping rules are intentionally boring: + +1. `input` in the dataset becomes the evaluator `query`. +2. The extracted target answer becomes `response`. +3. `expected` in the dataset becomes `ground_truth`. +4. `context` in the dataset becomes evaluator `context`. +5. For grey-box HTTP, `response_fields.response` supplies the final answer. +6. For grey-box HTTP, `response_fields.context` can supply `$response.context`. +7. `$retrieved_context` and `$retrieved_context_items` expose retrieval context + in the evaluator placeholder format. +8. `$telemetry.trace_id` exposes imported telemetry lineage when it exists. +9. Tool fields are used only when the dataset or response includes tool data. + +For RAG, prefer a live context from the response when the endpoint can return it. +That gives the judge the same evidence the agent used for the answer. Use static +`row.context` when you want a fixed, hand-authored reference context. + +## Examples + +Static dataset row: + +```json +{"input":"What is the refund window?","expected":"Customers can request a refund within 30 days.","context":"Refunds are available for 30 days after purchase."} ``` -## 5.2) Example for risk/safety evaluator (`azure_ai_project`) +Static dataset config: ```yaml -evaluators: - - name: ViolenceEvaluator - source: foundry - enabled: true - config: - kind: builtin - class_name: ViolenceEvaluator - init: - azure_ai_project: ${env:AZURE_AI_FOUNDRY_PROJECT_ENDPOINT} - input_mapping: - query: $prompt - response: $prediction - score_keys: - - violence - - score - -thresholds: - - evaluator: ViolenceEvaluator - criteria: "<=" - value: 2 +response_source: dataset ``` -`GroundednessProEvaluator` follows the same pattern (`azure_ai_project` in `init`). +Use `response_source: dataset` when each row already has a `response`, +`prediction`, `output`, or `answer` value and AgentOps should evaluate that value +instead of calling the target. -## 5.3) Example for agent evaluator (agent payload + tools) +Grey-box HTTP config: ```yaml -evaluators: - - name: ToolCallAccuracyEvaluator - source: foundry - enabled: true - config: - kind: builtin - class_name: ToolCallAccuracyEvaluator - input_mapping: - query: $row.query_messages - response: $row.response_messages - tool_calls: $row.tool_calls - tool_definitions: $row.tool_definitions - score_keys: - - tool_call_accuracy - - score +protocol: http-json +request_field: message +response_fields: + response: output.answer + context: output.retrieval.chunks +``` -thresholds: - - evaluator: ToolCallAccuracyEvaluator - criteria: ">=" - value: 3 +Telemetry import: + +```powershell +agentops telemetry validate prod-rag +agentops telemetry preview prod-rag --rows 10 +agentops telemetry import prod-rag --apply ``` -## 5.4) Example for NLP evaluator (non-LLM) +## Quality judges -```yaml -evaluators: - - name: F1ScoreEvaluator - source: foundry - enabled: true - config: - kind: builtin - class_name: F1ScoreEvaluator - input_mapping: - response: $prediction - ground_truth: $expected - score_keys: - - f1_score - - score +| Evaluator | Typical inputs | Notes | +|---|---|---| +| `CoherenceEvaluator` | `query`, `response` | Checks whether the answer is logically consistent. | +| `FluencyEvaluator` | `response` | Checks language quality. | +| `SimilarityEvaluator` | `query`, `response`, `ground_truth` | Compares the answer with the expected answer. | +| `ResponseCompletenessEvaluator` | `query`, `response`, `ground_truth` | Checks whether the answer covers what was expected. | +| `RelevanceEvaluator` | `query`, `response`, optional `context` | Useful for both chat and RAG quality. | -thresholds: - - evaluator: F1ScoreEvaluator - criteria: ">=" - value: 0.7 -``` +Quality judges need a judge model deployment. Set +`AZURE_OPENAI_DEPLOYMENT` or `AZURE_AI_MODEL_DEPLOYMENT_NAME` when local or +cloud evaluation needs one. -## 6) Cloud Evaluation defaults +## Safety judges -AgentOps provides sensible defaults so you don't need to configure extra environment variables: +| Evaluator | Typical inputs | Notes | +|---|---|---| +| `ViolenceEvaluator` | `query`, `response` | Scores violent content risk. | +| `SexualEvaluator` | `query`, `response` | Scores sexual content risk. | +| `SelfHarmEvaluator` | `query`, `response` | Scores self-harm content risk. | +| `HateUnfairnessEvaluator` | `query`, `response` | Scores hate and unfairness risk. | +| `ProtectedMaterialEvaluator` | `query`, `response` | Checks protected material risk when supported by the SDK. | +| `ContentSafetyEvaluator` | `query`, `response` | Composite safety path when supported by the SDK. | -| Setting | Default | Override | +Safety judges require a Foundry project connection. Use +`AZURE_AI_FOUNDRY_PROJECT_ENDPOINT` or `project_endpoint:` in `agentops.yaml`. + +## Agent judges + +| Evaluator | Typical inputs | Notes | |---|---|---| -| Judge model (AI-assisted evaluators) | A deployment you configure in your project | `AZURE_OPENAI_DEPLOYMENT` or `AZURE_AI_MODEL_DEPLOYMENT_NAME` env var | -| Authentication | `DefaultAzureCredential` (passwordless) | `az login` locally, Managed Identity in Azure | +| `ToolCallAccuracyEvaluator` | `query`, `tool_calls`, `tool_definitions` | Checks whether the expected tools were called. | +| `IntentResolutionEvaluator` | `query`, `response`, `tool_definitions` | Checks whether the agent resolved the user's intent. | +| `TaskAdherenceEvaluator` | `query`, `response`, `tool_definitions` | Checks whether the agent stayed on task. | +| `TaskCompletionEvaluator` | conversation payload | Preview in some SDK versions. | +| `ToolSelectionEvaluator` | tool selection plus tool definitions | Preview in some SDK versions. | +| `ToolInputAccuracyEvaluator` | tool arguments plus tool definitions | Preview in some SDK versions. | -## 7) Known caveats +Agent judges work best when the target returns tool telemetry or the dataset row +contains expected tool calls. If the endpoint cannot expose tool calls, start +with answer quality and RAG judges instead. + +## Local metrics + +| Evaluator | Typical inputs | Notes | +|---|---|---| +| `F1ScoreEvaluator` | `response`, `ground_truth` | Good for exact reference checks. | +| `BleuScoreEvaluator` | `response`, `ground_truth` | Optional text similarity metric. | +| `GleuScoreEvaluator` | `response`, `ground_truth` | Optional text similarity metric. | +| `RougeScoreEvaluator` | `response`, `ground_truth` | Optional summary similarity metric. | +| `MeteorScoreEvaluator` | `response`, `ground_truth` | Optional text similarity metric. | +| `avg_latency_seconds` | elapsed time | AgentOps computes this locally. | + +Local metrics are useful when you want a cheap deterministic signal. They are not +a replacement for human review or RAG-specific judges. + +## Cloud defaults + +AgentOps keeps cloud evaluation setup minimal: + +| Setting | Default | Override | +|---|---|---| +| Authentication | `DefaultAzureCredential` | `az login` locally, managed identity in Azure, or federated identity in CI. | +| Foundry project | `project_endpoint` or `AZURE_AI_FOUNDRY_PROJECT_ENDPOINT` | Set either value before running. | +| Judge model | Project deployment selected by environment | `AZURE_OPENAI_DEPLOYMENT` or `AZURE_AI_MODEL_DEPLOYMENT_NAME`. | +| Publishing | Implicit for `execution: cloud` | `publish: true` for local runs that should upload metrics. | -- Some agent evaluators listed in the latest Foundry docs are preview and can change name/signature. -- Not all preview evaluators have stable Python API docs with full constructor/call signatures at any given time. -- When a signature changes, update the evaluator override list in `agentops.yaml` (no code change is needed in AgentOps core; the runtime is generic). +## Caveats -**Last updated:** 2026-03-02 (UTC) +- Foundry Evaluation SDK preview evaluators can change names or call signatures. +- If the SDK changes an evaluator, keep the docs, catalog, and tests in sync. +- `response_fields.response` is the final answer path for HTTP JSON responses. +- `response_fields.context` is the retrieved context path for RAG evaluation. +- Production trace imports need review before they become blocking release gates. -Because Foundry Evaluation SDK and evaluator signatures evolve (especially preview features), review official docs before production rollout. +**Last updated:** 2026-06-26 (UTC) diff --git a/src/agentops/cli/app.py b/src/agentops/cli/app.py index b40f386..e2a473e 100644 --- a/src/agentops/cli/app.py +++ b/src/agentops/cli/app.py @@ -75,6 +75,9 @@ "for the manual." ) ) +telemetry_app = typer.Typer( + help="Import Azure Monitor telemetry into AgentOps datasets." +) app.add_typer(eval_app, name="eval") app.add_typer(report_app, name="report") app.add_typer(workflow_app, name="workflow") @@ -85,6 +88,7 @@ app.add_typer(init_app, name="init") app.add_typer(assert_app, name="assert") app.add_typer(redteam_app, name="redteam") +app.add_typer(telemetry_app, name="telemetry") log = get_logger(__name__) DEFAULT_REPORT_INPUT = Path(".agentops/results/latest/results.json") @@ -2237,6 +2241,109 @@ def cmd_eval_promote_traces( ) +@telemetry_app.command("validate") +def cmd_telemetry_validate( + name: Annotated[str, typer.Argument(help="Name under telemetry_imports.")], + config: Annotated[ + Optional[Path], + typer.Option("--config", "-c", help="Path to agentops.yaml."), + ] = None, +) -> None: + """Validate a named telemetry import without querying Azure.""" + + from agentops.core.config_loader import load_agentops_config + from agentops.services.telemetry_import import ( + TelemetryImportError, + find_telemetry_import, + validate_telemetry_import, + ) + + try: + cfg = load_agentops_config(_resolve_eval_config_path(config)) + item = find_telemetry_import(cfg, name) + warnings = validate_telemetry_import(item) + except (TelemetryImportError, ValueError) as exc: + typer.echo(_cli_error(str(exc)), err=True) + raise typer.Exit(1) from exc + typer.echo(_cli_ok(f"telemetry import {name!r} is valid")) + for warning in warnings: + typer.echo(_cli_warn(f"warning: {warning}")) + + +@telemetry_app.command("preview") +def cmd_telemetry_preview( + name: Annotated[str, typer.Argument(help="Name under telemetry_imports.")], + rows: Annotated[int, typer.Option("--rows", min=1, help="Maximum rows to preview.")] = 10, + config: Annotated[ + Optional[Path], + typer.Option("--config", "-c", help="Path to agentops.yaml."), + ] = None, +) -> None: + """Query Azure Monitor and print a small dataset preview.""" + + from agentops.core.config_loader import load_agentops_config + from agentops.services.telemetry_import import ( + TelemetryImportError, + find_telemetry_import, + preview_telemetry_import, + render_telemetry_import_preview, + ) + + try: + cfg = load_agentops_config(_resolve_eval_config_path(config)) + item = find_telemetry_import(cfg, name) + preview = preview_telemetry_import(item, rows=rows, apply=False) + except (TelemetryImportError, ValueError) as exc: + typer.echo(_cli_error(str(exc)), err=True) + raise typer.Exit(1) from exc + typer.echo(render_telemetry_import_preview(preview)) + + +@telemetry_app.command("import") +def cmd_telemetry_import( + name: Annotated[str, typer.Argument(help="Name under telemetry_imports.")], + apply: Annotated[ + bool, + typer.Option("--apply", help="Write JSONL rows and manifest."), + ] = False, + rows: Annotated[ + Optional[int], + typer.Option("--rows", min=1, help="Optional maximum rows to import."), + ] = None, + config: Annotated[ + Optional[Path], + typer.Option("--config", "-c", help="Path to agentops.yaml."), + ] = None, +) -> None: + """Import telemetry into the configured JSONL output path.""" + + from agentops.core.config_loader import load_agentops_config + from agentops.services.telemetry_import import ( + TelemetryImportError, + find_telemetry_import, + preview_telemetry_import, + render_telemetry_import_preview, + ) + + if not apply: + typer.echo( + _cli_warn( + "Dry run only. Re-run with --apply to write the JSONL dataset and manifest." + ) + ) + try: + cfg = load_agentops_config(_resolve_eval_config_path(config)) + item = find_telemetry_import(cfg, name) + preview = preview_telemetry_import(item, rows=rows, apply=apply) + except (TelemetryImportError, ValueError) as exc: + typer.echo(_cli_error(str(exc)), err=True) + raise typer.Exit(1) from exc + typer.echo(render_telemetry_import_preview(preview)) + if apply: + typer.echo(_cli_updated(preview.output_path)) + typer.echo(_cli_updated(preview.manifest_path)) + + def _resolve_eval_config_path(config: Path | None) -> Path: if config is not None: return config diff --git a/src/agentops/core/agentops_config.py b/src/agentops/core/agentops_config.py index 57670e8..f7268ec 100644 --- a/src/agentops/core/agentops_config.py +++ b/src/agentops/core/agentops_config.py @@ -70,6 +70,14 @@ #: Dataset shape used by the evaluator runtime or Foundry / azd recipes. DatasetKind = Literal["auto", "single-turn", "multi-turn"] +#: Where the local evaluator runtime gets the response text for each row. +ResponseSource = Literal["agent", "dataset"] + +#: Production telemetry import providers and destinations. +TelemetrySourceProvider = Literal["azure-monitor"] +TelemetryTarget = Literal["application-insights", "log-analytics"] +TelemetryLabelMode = Literal["self-similarity", "pending"] + #: Internal-only literal kept for the publisher dispatch table. Derived from #: ``execution`` + ``publish`` via :meth:`AgentOpsConfig.publish_target`. PublishTarget = Literal["foundry", "foundry_cloud"] @@ -316,6 +324,116 @@ def _url_non_empty(cls, value: Optional[str]) -> Optional[str]: return value +# --------------------------------------------------------------------------- +# Telemetry import configuration +# --------------------------------------------------------------------------- + + +class TelemetryTimeRangeConfig(BaseModel): + """Time window for a telemetry import query. + + Users can either provide explicit ISO-ish ``from``/``to`` timestamps or a + relative ``lookback_days`` window. The service owns final KQL rendering so + users never pass arbitrary query text. + """ + + from_: Optional[str] = Field(None, alias="from") + to: Optional[str] = None + lookback_days: Optional[int] = Field(None, ge=1, le=90) + + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + @model_validator(mode="after") + def _validate_window(self) -> "TelemetryTimeRangeConfig": + explicit = self.from_ is not None or self.to is not None + if explicit and not (self.from_ and self.to): + raise ValueError("telemetry_imports.time_range requires both from and to") + if explicit and self.lookback_days is not None: + raise ValueError("telemetry_imports.time_range cannot mix from/to with lookback_days") + if not explicit and self.lookback_days is None: + self.lookback_days = 7 + return self + + +class TelemetryPrivacyConfig(BaseModel): + """Privacy controls applied before JSONL rows are written.""" + + redact_fields: List[str] = Field( + default_factory=lambda: ["authorization", "api_key", "token", "password", "secret"], + description="Case-insensitive field-name fragments to redact.", + ) + max_field_length: int = Field(4000, ge=100, le=20000) + include_raw: bool = False + + model_config = ConfigDict(extra="forbid") + + +class TelemetryOutputConfig(BaseModel): + """Output paths and labeling mode for generated dataset rows.""" + + path: Path = Field(Path(".agentops") / "data" / "telemetry-import.jsonl") + manifest_path: Optional[Path] = None + label_mode: TelemetryLabelMode = "self-similarity" + + model_config = ConfigDict(extra="forbid") + + +class TelemetryImportConfig(BaseModel): + """Named telemetry import declaration. + + The MVP intentionally keeps this declarative: users choose a supported + source/destination pair, field mappings, filters, privacy settings, and an + output file. The service generates the KQL. + """ + + name: str + source: TelemetrySourceProvider = "azure-monitor" + target: TelemetryTarget + resource_id: Optional[str] = None + workspace_id: Optional[str] = None + application_id: Optional[str] = None + connection_string: Optional[str] = None + time_range: TelemetryTimeRangeConfig = Field(default_factory=TelemetryTimeRangeConfig) + filters: Dict[str, str | List[str]] = Field(default_factory=dict) + fields: Dict[str, str] = Field(default_factory=dict) + privacy: TelemetryPrivacyConfig = Field(default_factory=TelemetryPrivacyConfig) + output: TelemetryOutputConfig = Field(default_factory=TelemetryOutputConfig) + max_rows: int = Field(100, ge=1, le=5000) + + model_config = ConfigDict(extra="forbid") + + @field_validator("name") + @classmethod + def _name_non_empty(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError("telemetry_imports.name must be non-empty") + return value + + @field_validator("resource_id", "workspace_id", "application_id", "connection_string") + @classmethod + def _optional_text_non_empty(cls, value: Optional[str]) -> Optional[str]: + if value is None: + return value + value = value.strip() + if not value: + raise ValueError("telemetry_imports resource identifiers must be non-empty") + return value + + @model_validator(mode="after") + def _validate_target_ids(self) -> "TelemetryImportConfig": + if self.target == "log-analytics" and not self.workspace_id: + raise ValueError("telemetry_imports targeting log-analytics require workspace_id") + if self.target == "application-insights" and not ( + self.resource_id or self.application_id or self.connection_string + ): + raise ValueError( + "telemetry_imports targeting application-insights require resource_id, " + "application_id, or connection_string" + ) + return self + + class PromptAgentBootstrap(BaseModel): """Bootstrap defaults for prompt-agent CI/CD when the target Foundry project does not yet contain the seed agent referenced by ``agent``. @@ -631,6 +749,13 @@ class AgentOpsConfig(BaseModel): version: int = Field(..., description="Schema version. Must be 1.") agent: str = Field(..., description="Target identifier (name:version, URL, or model:deployment)") dataset: Path = Field(..., description="Path to a JSONL dataset file") + response_source: ResponseSource = Field( + "agent", + description=( + "Where local eval gets each response. 'agent' invokes the configured " + "target. 'dataset' uses each row's response or prediction value." + ), + ) dataset_kind: DatasetKind = Field( "auto", description=( @@ -693,11 +818,16 @@ class AgentOpsConfig(BaseModel): protocol: Optional[Protocol] = None request_field: Optional[str] = None response_field: Optional[str] = None + response_fields: Dict[str, str] = Field(default_factory=dict) tool_calls_field: Optional[str] = None headers: Dict[str, str] = Field(default_factory=dict) auth_header_env: Optional[str] = None evaluators: Optional[List[EvaluatorOverride]] = None + telemetry_imports: List[TelemetryImportConfig] = Field( + default_factory=list, + description="Named Azure Monitor imports that generate AgentOps JSONL datasets.", + ) rubrics: List[RubricConfig] = Field( default_factory=list, description="Optional context-specific rubric evaluator definitions.", @@ -847,6 +977,7 @@ def _validate_protocol_compat(self) -> "AgentOpsConfig": if kind != "http_json" and ( self.request_field or self.response_field + or self.response_fields or self.tool_calls_field or self.headers or self.auth_header_env diff --git a/src/agentops/pipeline/invocations.py b/src/agentops/pipeline/invocations.py index 0eb590e..23d131e 100644 --- a/src/agentops/pipeline/invocations.py +++ b/src/agentops/pipeline/invocations.py @@ -483,7 +483,7 @@ def _invoke_http_json( ) elapsed = time.perf_counter() - started - response_path = config.response_field or "text" + response_path = config.response_field or config.response_fields.get("response") or "text" response_text = _dot_path(payload, response_path) if response_text is None: for fallback in ("response", "output", "content", "message", "text"): @@ -498,6 +498,13 @@ def _invoke_http_json( if not isinstance(response_text, str): response_text = json.dumps(response_text, ensure_ascii=False) + response_fields: Dict[str, Any] = {} + for name, path in config.response_fields.items(): + value = _dot_path(payload, path) + if value is not None: + response_fields[name] = value + response_fields.setdefault("response", response_text) + tool_calls: Optional[List[Any]] = None if config.tool_calls_field: extracted = _dot_path(payload, config.tool_calls_field) @@ -508,6 +515,7 @@ def _invoke_http_json( response=response_text.strip(), latency_seconds=elapsed, tool_calls=tool_calls, + metadata={"response_fields": response_fields} if response_fields else {}, ) diff --git a/src/agentops/pipeline/orchestrator.py b/src/agentops/pipeline/orchestrator.py index 81dc32e..806c517 100644 --- a/src/agentops/pipeline/orchestrator.py +++ b/src/agentops/pipeline/orchestrator.py @@ -718,7 +718,8 @@ def _evaluate_row( preview = str(row.get("input", "")).strip().replace("\n", " ") if len(preview) > 80: preview = preview[:77] + "..." - progress(f"{label} invoking target: {preview!r}") + action = "using dataset response" if config.response_source == "dataset" else "invoking target" + progress(f"{label} {action}: {preview!r}") expected = row.get("expected") expected_text = str(expected) if expected is not None else None @@ -728,18 +729,21 @@ def _evaluate_row( expected_text=expected_text, ) as item_span: try: - with telemetry.agent_invoke_span( - target="agent" if target.kind.startswith("foundry") else "model", - model=target.deployment, - agent_id=target.raw if target.kind.startswith("foundry") else None, - agent_name=target.name, - agent_version=target.version, - ) as invoke_span: - invocation = invocations.invoke(target, config, row, timeout=timeout) - telemetry.set_agent_invoke_result( - invoke_span, - response_model=target.deployment, - ) + if config.response_source == "dataset": + invocation = _dataset_invocation(row) + else: + with telemetry.agent_invoke_span( + target="agent" if target.kind.startswith("foundry") else "model", + model=target.deployment, + agent_id=target.raw if target.kind.startswith("foundry") else None, + agent_name=target.name, + agent_version=target.version, + ) as invoke_span: + invocation = invocations.invoke(target, config, row, timeout=timeout) + telemetry.set_agent_invoke_result( + invoke_span, + response_model=target.deployment, + ) except Exception as exc: # noqa: BLE001 telemetry.set_eval_item_result(item_span, passed=False) logger.debug("row %d invocation failed: %s", index, exc) @@ -759,11 +763,16 @@ def _evaluate_row( f"({tool_count} tool call(s)); scoring..." ) + response_fields = invocation.metadata.get("response_fields") + evaluator_row = row + if isinstance(response_fields, dict) and response_fields: + evaluator_row = {**row, "response": response_fields} + metrics: List[RowMetric] = [] for evaluator in evaluators: metric = runtime.run_evaluator( evaluator, - row=row, + row=evaluator_row, response=invocation.response, latency_seconds=invocation.latency_seconds, actual_tool_calls=invocation.tool_calls, @@ -814,18 +823,57 @@ def _format_metric(m: RowMetric) -> str: scored = ", ".join(_format_metric(m) for m in metrics) progress(f"{label} scored: {scored}") + result_context = ( + response_fields.get("context") + if isinstance(response_fields, dict) and response_fields.get("context") is not None + else row.get("context") + ) + return RowResult( row_index=index, input=str(row.get("input", "")), expected=row.get("expected"), response=invocation.response, - context=row.get("context"), + context=_context_as_text(result_context), latency_seconds=invocation.latency_seconds, tool_calls=invocation.tool_calls, metrics=metrics, ) +def _dataset_invocation(row: Dict[str, Any]) -> invocations.InvocationResult: + """Build an invocation result from dataset columns without calling a target.""" + + response = row.get("response") + if response is None: + response = row.get("prediction") + if response is None: + raise ValueError( + "response_source: dataset requires each dataset row to contain " + "a response or prediction field" + ) + + tool_calls = row.get("actual_tool_calls") + if tool_calls is None: + tool_calls = row.get("tool_calls") + if tool_calls is not None and not isinstance(tool_calls, list): + tool_calls = None + return invocations.InvocationResult( + response=str(response), + latency_seconds=0.0, + tool_calls=tool_calls, + metadata={"response_source": "dataset"}, + ) + + +def _context_as_text(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, str): + return value + return json.dumps(value, ensure_ascii=False) + + # --------------------------------------------------------------------------- # Aggregation # --------------------------------------------------------------------------- diff --git a/src/agentops/pipeline/runtime.py b/src/agentops/pipeline/runtime.py index b22e4db..ba986b3 100644 --- a/src/agentops/pipeline/runtime.py +++ b/src/agentops/pipeline/runtime.py @@ -195,8 +195,11 @@ def load_evaluators(presets: List[EvaluatorPreset]) -> List[EvaluatorRuntime]: "$prediction": "response", "$expected": "expected", "$context": "context", + "$retrieved_context": "retrieved_context", + "$retrieved_context_items": "retrieved_context_items", "$tool_calls": "tool_calls", "$tool_definitions": "tool_definitions", + "$telemetry.trace_id": "telemetry.trace_id", } @@ -294,21 +297,40 @@ def _resolve_kwargs( response: str, ) -> Dict[str, Any]: resolved: Dict[str, Any] = {} + row_response = row.get("response") merged = {**row, "response": response, "input": row.get("input")} for kwarg, placeholder in mapping.items(): if not isinstance(placeholder, str) or not placeholder.startswith("$"): resolved[kwarg] = placeholder continue - source_key = _PLACEHOLDERS.get(placeholder) - if source_key is None: + source_path = _PLACEHOLDERS.get(placeholder) + if source_path is None and placeholder.startswith("$response."): + if isinstance(row_response, dict): + value = _lookup_placeholder(row_response, placeholder[len("$response."):]) + if value is not None: + resolved[kwarg] = value + continue + source_path = placeholder[1:] + if source_path is None and placeholder.startswith("$telemetry."): + source_path = placeholder[1:] + if source_path is None: raise ValueError(f"unknown evaluator placeholder {placeholder!r}") - value = merged.get(source_key) + value = _lookup_placeholder(merged, source_path) if value is None: continue resolved[kwarg] = value return resolved +def _lookup_placeholder(data: Dict[str, Any], path: str) -> Any: + current: Any = data + for part in path.split("."): + if not isinstance(current, dict): + return None + current = current.get(part) + return current + + def _extract_score(payload: Any, score_key: str) -> Optional[float]: if payload is None: return None diff --git a/src/agentops/services/telemetry_import.py b/src/agentops/services/telemetry_import.py new file mode 100644 index 0000000..286778c --- /dev/null +++ b/src/agentops/services/telemetry_import.py @@ -0,0 +1,550 @@ +"""Import Azure Monitor telemetry into AgentOps JSONL datasets. + +The module has two halves: + +* a pure transformer that maps telemetry rows into AgentOps dataset rows +* a thin Azure Monitor query wrapper with lazy SDK imports + +Users never provide raw KQL. The query builder only accepts structured time +ranges, field mappings, filters, and row limits from ``agentops.yaml``. +""" + +from __future__ import annotations + +import json +import os +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, Optional + +from agentops.core.agentops_config import AgentOpsConfig, TelemetryImportConfig + +DEFAULT_MAX_ROWS = 100 +MAX_ROWS_CAP = 5000 + +_DEFAULT_FIELD_CANDIDATES: dict[str, tuple[str, ...]] = { + "input": ( + "input", + "query", + "prompt", + "message", + "user_message", + "customDimensions.input", + "customDimensions.query", + "customDimensions.prompt", + "customDimensions.gen_ai.prompt", + ), + "response": ( + "response", + "prediction", + "output", + "answer", + "completion", + "assistant_message", + "customDimensions.response", + "customDimensions.prediction", + "customDimensions.output", + "customDimensions.gen_ai.completion", + ), + "context": ( + "context", + "retrieved_context", + "grounding", + "customDimensions.context", + "customDimensions.retrieved_context", + "customDimensions.grounding", + ), + "retrieved_context_items": ( + "retrieved_context_items", + "context_items", + "customDimensions.retrieved_context_items", + "customDimensions.context_items", + ), + "tool_calls": ("tool_calls", "customDimensions.tool_calls"), + "trace_id": ("trace_id", "operation_Id", "operationId"), + "turn_id": ("turn_id", "span_id", "id", "customDimensions.turn_id"), + "timestamp": ("timestamp", "TimeGenerated", "time"), +} + +_QUERY_COLUMNS = ( + "timestamp", + "operation_Id = column_ifexists('operation_Id', '')", + "operationId = column_ifexists('operationId', '')", + "id = column_ifexists('id', '')", + "name = column_ifexists('name', '')", + "message = column_ifexists('message', '')", + "duration = column_ifexists('duration', '')", + "success = column_ifexists('success', '')", + "customDimensions = column_ifexists('customDimensions', dynamic({}))", +) + + +class TelemetryImportError(RuntimeError): + """Raised when a telemetry import cannot be validated, queried, or written.""" + + +@dataclass(frozen=True) +class TelemetryImportPreview: + """Result of validating/querying/transforming one telemetry import.""" + + config: TelemetryImportConfig + output_path: Path + manifest_path: Path + rows: list[dict[str, Any]] + skipped: int = 0 + deduped: int = 0 + truncated: bool = False + warnings: list[str] = field(default_factory=list) + + +def find_telemetry_import( + config: AgentOpsConfig, + name: str, +) -> TelemetryImportConfig: + """Return a named telemetry import or raise a friendly error.""" + + for item in config.telemetry_imports: + if item.name == name: + return item + available = ", ".join(item.name for item in config.telemetry_imports) or "none" + raise TelemetryImportError( + f"telemetry import {name!r} was not found in agentops.yaml. " + f"Available imports: {available}." + ) + + +def validate_telemetry_import(config: TelemetryImportConfig) -> list[str]: + """Validate service-level constraints and return non-fatal warnings.""" + + warnings: list[str] = [] + if config.output.label_mode == "self-similarity": + warnings.append( + "Generated rows use production responses as expected values for drift " + "detection, not human-verified ground truth." + ) + return warnings + + +def preview_telemetry_import( + config: TelemetryImportConfig, + *, + rows: Optional[int] = None, + apply: bool = False, +) -> TelemetryImportPreview: + """Query Azure Monitor, transform rows, and optionally write JSONL output.""" + + validate_telemetry_import(config) + raw_rows = query_azure_monitor(config, rows=rows) + preview = transform_telemetry_rows(config, raw_rows, rows=rows) + if apply: + write_telemetry_import(preview) + return preview + + +def transform_telemetry_rows( + config: TelemetryImportConfig, + telemetry_rows: Iterable[dict[str, Any]], + *, + rows: Optional[int] = None, +) -> TelemetryImportPreview: + """Pure transformation from telemetry records to AgentOps dataset rows.""" + + limit = _bounded_rows(rows if rows is not None else config.max_rows) + output_path = config.output.path + manifest_path = config.output.manifest_path or output_path.with_name( + f"{output_path.stem}-manifest.json" + ) + warnings = validate_telemetry_import(config) + converted: list[dict[str, Any]] = [] + skipped = 0 + deduped = 0 + seen: set[tuple[str, str]] = set() + + for raw in telemetry_rows: + if len(converted) >= limit: + break + row = _telemetry_row_to_agentops_row(config, raw) + if row is None: + skipped += 1 + continue + telemetry = row.get("telemetry") + trace_id = "" + turn_id = "" + if isinstance(telemetry, dict): + trace_id = str(telemetry.get("trace_id") or "") + turn_id = str(telemetry.get("turn_id") or "") + key = (trace_id or row["input"], turn_id or row.get("response", "")) + if key in seen: + deduped += 1 + continue + seen.add(key) + converted.append(row) + + truncated = len(converted) >= limit + if not converted: + warnings.append("No telemetry rows contained both input and response text.") + return TelemetryImportPreview( + config=config, + output_path=output_path, + manifest_path=manifest_path, + rows=converted, + skipped=skipped, + deduped=deduped, + truncated=truncated, + warnings=warnings, + ) + + +def write_telemetry_import(preview: TelemetryImportPreview) -> None: + """Write JSONL rows and a small manifest next to the output.""" + + preview.output_path.parent.mkdir(parents=True, exist_ok=True) + with preview.output_path.open("w", encoding="utf-8") as handle: + for row in preview.rows: + handle.write(json.dumps(row, ensure_ascii=False) + "\n") + + trace_ids = [ + str(row.get("telemetry", {}).get("trace_id")) + for row in preview.rows + if isinstance(row.get("telemetry"), dict) and row["telemetry"].get("trace_id") + ] + manifest = { + "version": 1, + "generated_at": datetime.now(timezone.utc).isoformat(), + "import": preview.config.name, + "source": preview.config.source, + "target": preview.config.target, + "output_path": str(preview.output_path), + "rows": len(preview.rows), + "skipped": preview.skipped, + "deduped": preview.deduped, + "truncated": preview.truncated, + "trace_ids": trace_ids, + "warnings": preview.warnings, + } + preview.manifest_path.write_text( + json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + +def render_telemetry_import_preview(preview: TelemetryImportPreview) -> str: + """Render concise CLI output.""" + + lines = [ + "AgentOps telemetry import", + f"Import: {preview.config.name}", + f"Target: {preview.config.target}", + f"Output: {preview.output_path}", + "", + "Summary", + f" rows {len(preview.rows)}", + f" skipped {preview.skipped}", + f" deduped {preview.deduped}", + f" truncated {str(preview.truncated).lower()}", + ] + if preview.warnings: + lines.append("") + lines.append("Warnings") + lines.extend(f" - {warning}" for warning in preview.warnings) + if preview.rows: + lines.append("") + lines.append("Sample rows") + for index, row in enumerate(preview.rows[:3], start=1): + lines.append(f" {index}. {str(row.get('input', ''))[:120]}") + return "\n".join(lines) + "\n" + + +def query_azure_monitor( + config: TelemetryImportConfig, + *, + rows: Optional[int] = None, +) -> list[dict[str, Any]]: + """Run the generated KQL against Azure Monitor with lazy SDK imports.""" + + try: + from azure.identity import DefaultAzureCredential # noqa: WPS433 + except ImportError as exc: + raise TelemetryImportError( + "Telemetry import requires Azure authentication packages. Install " + "them with: python -m pip install azure-identity azure-monitor-query" + ) from exc + + kql = build_telemetry_kql(config, rows=rows) + credential = DefaultAzureCredential( + exclude_developer_cli_credential=True, + process_timeout=30, + ) + try: + if config.target == "log-analytics": + from azure.monitor.query import LogsQueryClient # noqa: WPS433 + + client = LogsQueryClient(credential) + workspace_id = _resolve_value(config.workspace_id, "workspace_id") + response = client.query_workspace(workspace_id, kql, timespan=None) + return _flatten_logs_response(response) + if config.resource_id: + from azure.monitor.query import LogsQueryClient # noqa: WPS433 + + client = LogsQueryClient(credential) + resource_id = _resolve_value(config.resource_id, "resource_id") + response = client.query_resource(resource_id, kql, timespan=None) + return _flatten_logs_response(response) + app_id = _application_id(config) + token = credential.get_token("https://api.applicationinsights.io/.default").token + return _query_application_insights(app_id, token, kql) + except ImportError as exc: + raise TelemetryImportError( + "Telemetry import with resource_id/workspace_id requires the Azure " + "Monitor Query SDK. Install it with: python -m pip install " + "azure-monitor-query" + ) from exc + except TelemetryImportError: + raise + except Exception as exc: # noqa: BLE001 + raise TelemetryImportError(f"Azure Monitor query failed: {exc}") from exc + + +def build_telemetry_kql( + config: TelemetryImportConfig, + *, + rows: Optional[int] = None, +) -> str: + """Build safe KQL from structured config only.""" + + limit = _bounded_rows(rows if rows is not None else config.max_rows) + clauses = ["union isfuzzy=true requests, dependencies, traces"] + clauses.append(f"| extend timestamp = {_timestamp_expr()}") + clauses.append(_time_clause(config)) + for key, value in sorted(config.filters.items()): + clauses.append(_filter_clause(key, value)) + columns = ", ".join(_QUERY_COLUMNS) + clauses.append(f"| project {columns}") + clauses.append("| order by timestamp desc") + clauses.append(f"| take {limit}") + return "\n".join(clauses) + + +def _telemetry_row_to_agentops_row( + config: TelemetryImportConfig, + raw: dict[str, Any], +) -> Optional[dict[str, Any]]: + input_text = _mapped_text(config, raw, "input") + response_text = _mapped_text(config, raw, "response") + if not input_text or not response_text: + return None + + label_mode = config.output.label_mode + telemetry = { + "trace_id": _mapped_text(config, raw, "trace_id"), + "turn_id": _mapped_text(config, raw, "turn_id"), + "timestamp": _mapped_text(config, raw, "timestamp"), + "source": config.source, + "target": config.target, + "import": config.name, + } + row: dict[str, Any] = { + "input": _clean_value(input_text, config), + "response": _clean_value(response_text, config), + "prediction": _clean_value(response_text, config), + "expected": _clean_value(response_text, config) if label_mode == "self-similarity" else "", + "telemetry": {k: v for k, v in telemetry.items() if v not in (None, "")}, + "metadata": { + "source": "azure_monitor_telemetry", + "label_mode": label_mode, + "needs_review": True, + }, + } + context = _mapped_value(config, raw, "context") + if context not in (None, "", [], {}): + row["context"] = _clean_value(context, config) + row["retrieved_context"] = row["context"] + context_items = _mapped_value(config, raw, "retrieved_context_items") + if context_items not in (None, "", [], {}): + row["retrieved_context_items"] = _clean_value(context_items, config) + tool_calls = _mapped_value(config, raw, "tool_calls") + if tool_calls not in (None, "", [], {}): + row["tool_calls"] = _clean_value(tool_calls, config) + if config.privacy.include_raw: + row["raw"] = _clean_value(raw, config) + return row + + +def _mapped_text(config: TelemetryImportConfig, raw: dict[str, Any], name: str) -> Optional[str]: + value = _mapped_value(config, raw, name) + if value is None: + return None + if isinstance(value, str): + value = value.strip() + return value or None + if isinstance(value, (dict, list)): + text = json.dumps(value, ensure_ascii=False) + return text if text not in ("{}", "[]") else None + text = str(value).strip() + return text or None + + +def _mapped_value(config: TelemetryImportConfig, raw: dict[str, Any], name: str) -> Any: + mapping = config.fields.get(name) + if mapping: + return _lookup(raw, mapping) + for candidate in _DEFAULT_FIELD_CANDIDATES.get(name, ()): + value = _lookup(raw, candidate) + if value not in (None, "", [], {}): + return value + return None + + +def _lookup(data: dict[str, Any], path: str) -> Any: + current: Any = data + for part in path.split("."): + if not isinstance(current, dict): + return None + current = current.get(part) + return current + + +def _clean_value(value: Any, config: TelemetryImportConfig, key: str = "") -> Any: + lowered = key.lower() + if any(fragment.lower() in lowered for fragment in config.privacy.redact_fields): + return "[redacted]" + if isinstance(value, dict): + return {k: _clean_value(v, config, str(k)) for k, v in value.items()} + if isinstance(value, list): + return [_clean_value(item, config, key) for item in value] + if isinstance(value, str) and len(value) > config.privacy.max_field_length: + return value[: config.privacy.max_field_length] + "...[truncated]" + return value + + +def _flatten_logs_response(response: Any) -> list[dict[str, Any]]: + tables = getattr(response, "tables", None) or [] + if not tables: + return [] + table = tables[0] + columns: list[str] = [] + for column in getattr(table, "columns", None) or []: + name = getattr(column, "name", None) if not isinstance(column, dict) else column.get("name") + if isinstance(name, str): + columns.append(name) + rows: list[dict[str, Any]] = [] + for raw in getattr(table, "rows", None) or []: + rows.append(dict(zip(columns, raw))) + return rows + + +def _application_id(config: TelemetryImportConfig) -> str: + if config.application_id: + return _resolve_value(config.application_id, "application_id") + if config.connection_string: + connection_string = _resolve_value(config.connection_string, "connection_string") + match = re.search(r"ApplicationId=([0-9a-fA-F-]+)", connection_string) + if match: + return match.group(1) + raise TelemetryImportError( + "application-insights imports require resource_id, application_id, or " + "a connection_string containing ApplicationId" + ) + + +def _query_application_insights(app_id: str, bearer: str, kql: str) -> list[dict[str, Any]]: + import json as _json + from urllib import request + + body = _json.dumps({"query": kql}).encode("utf-8") + req = request.Request( + url=f"https://api.applicationinsights.io/v1/apps/{app_id}/query", + data=body, + headers={ + "Authorization": f"Bearer {bearer}", + "Content-Type": "application/json", + }, + method="POST", + ) + with request.urlopen(req, timeout=30) as response: # noqa: S310 + parsed = _json.loads(response.read()) + if isinstance(parsed, dict) and parsed.get("error"): + err = parsed["error"] + message = err.get("message") if isinstance(err, dict) else str(err) + raise TelemetryImportError(f"Application Insights query failed: {message}") + tables = parsed.get("tables") if isinstance(parsed, dict) else None + if not tables: + return [] + table = tables[0] + columns = [column.get("name") for column in table.get("columns", [])] + return [dict(zip(columns, row)) for row in table.get("rows", [])] + + +def _time_clause(config: TelemetryImportConfig) -> str: + tr = config.time_range + if tr.from_ and tr.to: + return ( + f"| where timestamp between (datetime({_kql_string(tr.from_)}) .. " + f"datetime({_kql_string(tr.to)}))" + ) + days = tr.lookback_days or 7 + return f"| where timestamp >= ago({days}d)" + + +def _filter_clause(key: str, value: str | list[str]) -> str: + expr = _safe_column_expr(key) + values = value if isinstance(value, list) else [value] + escaped = ", ".join(_kql_string(str(item)) for item in values) + if len(values) == 1: + return f"| where {expr} == {escaped}" + return f"| where {expr} in ({escaped})" + + +def _safe_column_expr(key: str) -> str: + if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)?", key): + raise TelemetryImportError( + f"unsafe telemetry filter field {key!r}; use a column name or customDimensions.name" + ) + if key.startswith("customDimensions."): + subkey = key.split(".", 1)[1] + return ( + "tostring(column_ifexists('customDimensions', dynamic({}))" + f"[{_kql_string(subkey)}])" + ) + return f"tostring(column_ifexists({_kql_string(key)}, ''))" + + +def _timestamp_expr() -> str: + return ( + "coalesce(" + "column_ifexists('timestamp', datetime(null)), " + "column_ifexists('TimeGenerated', datetime(null)), " + "column_ifexists('time', datetime(null))" + ")" + ) + + +def _kql_string(value: str) -> str: + return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" + + +def _resolve_value(value: Optional[str], label: str) -> str: + if not value: + raise TelemetryImportError(f"telemetry import is missing {label}") + value = value.strip() + env_name: Optional[str] = None + if value.startswith("env:"): + env_name = value[4:] + elif value.startswith("$") and len(value) > 1: + env_name = value[1:].strip("{}") + if env_name: + resolved = os.getenv(env_name) + if not resolved: + raise TelemetryImportError( + f"environment variable {env_name} referenced by {label} is not set" + ) + return resolved + return value + + +def _bounded_rows(rows: int) -> int: + if rows <= 0: + raise TelemetryImportError("rows must be greater than zero") + return min(rows, MAX_ROWS_CAP) diff --git a/tests/unit/test_agentops_config.py b/tests/unit/test_agentops_config.py index 288f56a..5fbc545 100644 --- a/tests/unit/test_agentops_config.py +++ b/tests/unit/test_agentops_config.py @@ -141,6 +141,111 @@ def test_minimal_config(self, tmp_path) -> None: assert cfg.version == 1 assert cfg.agent == "my-rag:3" assert cfg.thresholds == {} + assert cfg.response_source == "agent" + assert cfg.telemetry_imports == [] + + def test_accepts_telemetry_import_config(self) -> None: + cfg = AgentOpsConfig.model_validate( + { + "version": 1, + "agent": "my-rag:3", + "dataset": "./qa.jsonl", + "response_source": "dataset", + "telemetry_imports": [ + { + "name": "prod", + "source": "azure-monitor", + "target": "application-insights", + "resource_id": "$APPINSIGHTS_RESOURCE_ID", + "time_range": {"lookback_days": 14}, + "filters": {"customDimensions.agent": "support"}, + "fields": { + "input": "customDimensions.question", + "response": "customDimensions.answer", + }, + "privacy": {"redact_fields": ["token"], "max_field_length": 500}, + "output": { + "path": ".agentops/data/prod.jsonl", + "label_mode": "pending", + }, + } + ], + } + ) + + item = cfg.telemetry_imports[0] + assert cfg.response_source == "dataset" + assert item.name == "prod" + assert item.source == "azure-monitor" + assert item.target == "application-insights" + assert item.resource_id == "$APPINSIGHTS_RESOURCE_ID" + assert item.time_range.lookback_days == 14 + assert item.output.label_mode == "pending" + + def test_telemetry_import_rejects_unknown_fields(self) -> None: + with pytest.raises(ValidationError): + AgentOpsConfig.model_validate( + { + "version": 1, + "agent": "my-rag:3", + "dataset": "./qa.jsonl", + "telemetry_imports": [ + { + "name": "prod", + "target": "log-analytics", + "workspace_id": "workspace", + "surprise": True, + } + ], + } + ) + + def test_telemetry_import_time_range_requires_one_mode(self) -> None: + with pytest.raises(ValidationError, match="cannot mix"): + AgentOpsConfig.model_validate( + { + "version": 1, + "agent": "my-rag:3", + "dataset": "./qa.jsonl", + "telemetry_imports": [ + { + "name": "prod", + "target": "log-analytics", + "workspace_id": "workspace", + "time_range": { + "from": "2026-06-01T00:00:00Z", + "to": "2026-06-02T00:00:00Z", + "lookback_days": 7, + }, + } + ], + } + ) + + def test_telemetry_import_accepts_explicit_time_range(self) -> None: + cfg = AgentOpsConfig.model_validate( + { + "version": 1, + "agent": "my-rag:3", + "dataset": "./qa.jsonl", + "telemetry_imports": [ + { + "name": "prod", + "target": "log-analytics", + "workspace_id": "workspace", + "time_range": { + "from": "2026-06-01T00:00:00Z", + "to": "2026-06-02T00:00:00Z", + }, + } + ], + } + ) + + time_range = cfg.telemetry_imports[0].time_range + assert time_range.from_ == "2026-06-01T00:00:00Z" + assert time_range.to == "2026-06-02T00:00:00Z" + assert time_range.lookback_days is None def test_resolved_target(self) -> None: cfg = AgentOpsConfig(version=1, agent="my-rag:3", dataset="./qa.jsonl") @@ -472,8 +577,10 @@ def test_http_fields_allowed_for_http_target(self) -> None: dataset="./qa.jsonl", request_field="message", response_field="text", + response_fields={"context": "retrieval.context"}, ) assert cfg.request_field == "message" + assert cfg.response_fields == {"context": "retrieval.context"} def test_http_fields_rejected_for_prompt_agent(self) -> None: with pytest.raises(ValidationError, match="HTTP/JSON"): @@ -481,7 +588,7 @@ def test_http_fields_rejected_for_prompt_agent(self) -> None: version=1, agent="my-rag:3", dataset="./qa.jsonl", - request_field="message", + response_fields={"context": "context"}, ) def test_evaluators_override(self) -> None: diff --git a/tests/unit/test_cli_commands.py b/tests/unit/test_cli_commands.py index 0c6b2a6..3bb6feb 100644 --- a/tests/unit/test_cli_commands.py +++ b/tests/unit/test_cli_commands.py @@ -1,6 +1,7 @@ from typer.testing import CliRunner from agentops.cli.app import app +from agentops.services.telemetry_import import TelemetryImportPreview runner = CliRunner() @@ -83,3 +84,102 @@ def test_agent_command_group_wired() -> None: stripped = _strip_ansi(result.stdout) assert "analyze" in stripped assert "serve" in stripped + + +def test_telemetry_validate_uses_named_import(tmp_path, monkeypatch) -> None: + config = tmp_path / "agentops.yaml" + config.write_text( + "\n".join( + [ + "version: 1", + "agent: support-agent:1", + "dataset: .agentops/data/smoke.jsonl", + "telemetry_imports:", + " - name: prod", + " target: log-analytics", + " workspace_id: workspace", + ] + ), + encoding="utf-8", + ) + monkeypatch.setattr( + "agentops.services.telemetry_import.validate_telemetry_import", + lambda _item: [], + ) + + result = runner.invoke(app, ["telemetry", "validate", "prod", "--config", str(config)]) + + assert result.exit_code == 0, result.output + assert "prod" in result.output + assert "valid" in result.output + + +def test_telemetry_preview_prints_service_preview(tmp_path, monkeypatch) -> None: + config = tmp_path / "agentops.yaml" + config.write_text( + "version: 1\n" + "agent: support-agent:1\n" + "dataset: .agentops/data/smoke.jsonl\n" + "telemetry_imports:\n" + " - name: prod\n" + " target: log-analytics\n" + " workspace_id: workspace\n", + encoding="utf-8", + ) + + def fake_preview(item, *, rows=None, apply=False): + return TelemetryImportPreview( + config=item, + output_path=tmp_path / "prod.jsonl", + manifest_path=tmp_path / "prod-manifest.json", + rows=[{"input": "hello", "response": "world"}], + ) + + monkeypatch.setattr( + "agentops.services.telemetry_import.preview_telemetry_import", + fake_preview, + ) + + result = runner.invoke( + app, + ["telemetry", "preview", "prod", "--rows", "1", "--config", str(config)], + ) + + assert result.exit_code == 0, result.output + assert "AgentOps telemetry import" in result.output + assert "hello" in result.output + + +def test_telemetry_import_requires_apply_to_write(tmp_path, monkeypatch) -> None: + config = tmp_path / "agentops.yaml" + config.write_text( + "version: 1\n" + "agent: support-agent:1\n" + "dataset: .agentops/data/smoke.jsonl\n" + "telemetry_imports:\n" + " - name: prod\n" + " target: log-analytics\n" + " workspace_id: workspace\n", + encoding="utf-8", + ) + calls = [] + + def fake_preview(item, *, rows=None, apply=False): + calls.append(apply) + return TelemetryImportPreview( + config=item, + output_path=tmp_path / "prod.jsonl", + manifest_path=tmp_path / "prod-manifest.json", + rows=[], + ) + + monkeypatch.setattr( + "agentops.services.telemetry_import.preview_telemetry_import", + fake_preview, + ) + + result = runner.invoke(app, ["telemetry", "import", "prod", "--config", str(config)]) + + assert result.exit_code == 0, result.output + assert calls == [False] + assert "Dry run only" in result.output diff --git a/tests/unit/test_http_response_fields.py b/tests/unit/test_http_response_fields.py new file mode 100644 index 0000000..abbdb32 --- /dev/null +++ b/tests/unit/test_http_response_fields.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from agentops.core.agentops_config import AgentOpsConfig, classify_agent +from agentops.core.evaluators import EvaluatorPreset +from agentops.pipeline import invocations, orchestrator, runtime + + +def test_http_json_captures_named_response_fields(monkeypatch) -> None: + cfg = AgentOpsConfig( + version=1, + agent="https://example.test/chat", + dataset="./qa.jsonl", + protocol="http-json", + request_field="question", + response_fields={ + "response": "output.answer", + "context": "output.context", + "citations": "output.citations", + }, + ) + target = classify_agent(cfg.agent, cfg.protocol) + + def fake_request_json(**_kwargs): + return { + "output": { + "answer": "Use the reset page.", + "context": ["Password reset article"], + "citations": ["password.md"], + } + } + + monkeypatch.setattr(invocations, "_http_request_json", fake_request_json) + + result = invocations.invoke( + target, + cfg, + {"input": "How do I reset my password?"}, + timeout=1, + ) + + assert result.response == "Use the reset page." + assert result.metadata["response_fields"] == { + "response": "Use the reset page.", + "context": ["Password reset article"], + "citations": ["password.md"], + } + + +def test_response_fields_are_available_to_evaluator_mapping(monkeypatch) -> None: + captured: dict[str, object] = {} + + def fake_evaluator(**kwargs): + captured.update(kwargs) + return {"score": 5} + + cfg = AgentOpsConfig( + version=1, + agent="https://example.test/chat", + dataset="./qa.jsonl", + ) + target = classify_agent(cfg.agent, cfg.protocol) + monkeypatch.setattr( + orchestrator.invocations, + "invoke", + lambda *_args, **_kwargs: invocations.InvocationResult( + response="Use the reset page.", + latency_seconds=0.25, + metadata={ + "response_fields": { + "response": "Use the reset page.", + "context": ["Password reset article"], + } + }, + ), + ) + evaluator = runtime.EvaluatorRuntime( + preset=EvaluatorPreset( + name="groundedness", + class_name="GroundednessEvaluator", + score_key="groundedness", + input_mapping={ + "response": "$prediction", + "context": "$response.context", + }, + ), + callable=fake_evaluator, + ) + + row = orchestrator._evaluate_row( + row={"input": "question", "expected": "answer"}, + index=0, + total=1, + target=target, + config=cfg, + evaluators=[evaluator], + timeout=1, + progress=lambda _msg: None, + rules_by_metric={}, + ) + + assert row.response == "Use the reset page." + assert row.context == '["Password reset article"]' + assert captured == { + "response": "Use the reset page.", + "context": ["Password reset article"], + } diff --git a/tests/unit/test_runtime_conversation.py b/tests/unit/test_runtime_conversation.py index 605d2b5..a983e3f 100644 --- a/tests/unit/test_runtime_conversation.py +++ b/tests/unit/test_runtime_conversation.py @@ -9,6 +9,7 @@ from __future__ import annotations from agentops.pipeline.runtime import _build_conversation_messages +from agentops.pipeline import runtime def test_builds_text_only_conversation_when_no_tool_calls() -> None: @@ -130,3 +131,27 @@ def test_skips_calls_without_a_name() -> None: # Only the named call survives, plus the final assistant text. assert len(out["response"]) == 2 assert out["response"][0]["content"][0]["name"] == "f" + + +def test_resolves_retrieval_and_telemetry_placeholders() -> None: + resolved = runtime._resolve_kwargs( + { + "context": "$retrieved_context", + "items": "$retrieved_context_items", + "trace": "$telemetry.trace_id", + "json_text": "$response.raw", + }, + row={ + "input": "q", + "retrieved_context": "doc text", + "retrieved_context_items": [{"id": "doc1"}], + "telemetry": {"trace_id": "trace-123"}, + "response": {"raw": "nested response"}, + }, + response="answer", + ) + + assert resolved["context"] == "doc text" + assert resolved["items"] == [{"id": "doc1"}] + assert resolved["trace"] == "trace-123" + assert resolved["json_text"] == "nested response" diff --git a/tests/unit/test_runtime_dataset_response_source.py b/tests/unit/test_runtime_dataset_response_source.py new file mode 100644 index 0000000..1680e4a --- /dev/null +++ b/tests/unit/test_runtime_dataset_response_source.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from agentops.core.agentops_config import AgentOpsConfig, classify_agent +from agentops.core.evaluators import EvaluatorPreset +from agentops.pipeline import orchestrator, runtime + + +def test_dataset_response_source_does_not_invoke_target(monkeypatch) -> None: + config = AgentOpsConfig( + version=1, + agent="https://example.test/chat", + dataset="./qa.jsonl", + response_source="dataset", + ) + target = classify_agent(config.agent, config.protocol) + latency = EvaluatorPreset( + name="avg_latency_seconds", + class_name="_latency", + score_key="avg_latency_seconds", + input_mapping={}, + ) + + def fail_invoke(*args, **kwargs): + raise AssertionError("target should not be invoked") + + monkeypatch.setattr(orchestrator.invocations, "invoke", fail_invoke) + + row = orchestrator._evaluate_row( + row={"input": "hello", "response": "cached answer", "expected": "cached answer"}, + index=0, + total=1, + target=target, + config=config, + evaluators=[runtime.load_evaluator(latency)], + timeout=1, + progress=lambda _msg: None, + rules_by_metric={}, + ) + + assert row.error is None + assert row.response == "cached answer" + assert row.latency_seconds == 0.0 + assert row.metrics[0].name == "avg_latency_seconds" + assert row.metrics[0].value == 0.0 + + +def test_dataset_response_source_accepts_prediction_field() -> None: + config = AgentOpsConfig( + version=1, + agent="https://example.test/chat", + dataset="./qa.jsonl", + response_source="dataset", + ) + target = classify_agent(config.agent, config.protocol) + latency = EvaluatorPreset( + name="avg_latency_seconds", + class_name="_latency", + score_key="avg_latency_seconds", + input_mapping={}, + ) + + row = orchestrator._evaluate_row( + row={"input": "hello", "prediction": "predicted answer"}, + index=0, + total=1, + target=target, + config=config, + evaluators=[runtime.load_evaluator(latency)], + timeout=1, + progress=lambda _msg: None, + rules_by_metric={}, + ) + + assert row.response == "predicted answer" diff --git a/tests/unit/test_telemetry_import.py b/tests/unit/test_telemetry_import.py new file mode 100644 index 0000000..aa07010 --- /dev/null +++ b/tests/unit/test_telemetry_import.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import builtins +import json + +import pytest + +from agentops.core.agentops_config import AgentOpsConfig +from agentops.services.telemetry_import import ( + TelemetryImportError, + build_telemetry_kql, + find_telemetry_import, + query_azure_monitor, + transform_telemetry_rows, + write_telemetry_import, +) + + +def _config(**overrides): + data = { + "version": 1, + "agent": "support-agent:1", + "dataset": ".agentops/data/smoke.jsonl", + "telemetry_imports": [ + { + "name": "prod", + "target": "application-insights", + "resource_id": "$APPINSIGHTS_RESOURCE_ID", + "fields": { + "input": "customDimensions.question", + "response": "customDimensions.answer", + "context": "customDimensions.context", + }, + "output": {"path": ".agentops/data/prod.jsonl"}, + **overrides, + } + ], + } + return AgentOpsConfig.model_validate(data).telemetry_imports[0] + + +def test_transform_rows_dedupes_redacts_and_writes_manifest(tmp_path) -> None: + cfg = _config( + output={"path": str(tmp_path / "prod.jsonl")}, + privacy={"redact_fields": ["token"], "max_field_length": 100, "include_raw": True}, + ) + raw = [ + { + "operation_Id": "trace-1", + "id": "turn-1", + "customDimensions": { + "question": "How do I reset my password?", + "answer": "Open account settings.", + "context": "Reset article", + "token": "secret-token", + }, + }, + { + "operation_Id": "trace-1", + "id": "turn-1", + "customDimensions": { + "question": "How do I reset my password?", + "answer": "Open account settings.", + }, + }, + {"customDimensions": {"question": "missing response"}}, + ] + + preview = transform_telemetry_rows(cfg, raw) + write_telemetry_import(preview) + + assert len(preview.rows) == 1 + assert preview.deduped == 1 + assert preview.skipped == 1 + row = preview.rows[0] + assert row["input"] == "How do I reset my password?" + assert row["response"] == "Open account settings." + assert row["expected"] == "Open account settings." + assert row["context"] == "Reset article" + assert row["telemetry"]["trace_id"] == "trace-1" + assert row["raw"]["customDimensions"]["token"] == "[redacted]" + assert (tmp_path / "prod.jsonl").exists() + manifest = json.loads((tmp_path / "prod-manifest.json").read_text(encoding="utf-8")) + assert manifest["rows"] == 1 + assert manifest["deduped"] == 1 + + +def test_build_kql_uses_safe_generated_filters() -> None: + cfg = _config(filters={"customDimensions.agent": ["support", "sales"]}, max_rows=1000) + + kql = build_telemetry_kql(cfg, rows=5) + + assert "union isfuzzy=true requests, dependencies, traces" in kql + assert "| extend timestamp = coalesce(" in kql + assert "column_ifexists('timestamp', datetime(null))" in kql + assert "column_ifexists('TimeGenerated', datetime(null))" in kql + assert "coalesce(timestamp, TimeGenerated)" not in kql + assert "ago(7d)" in kql + assert ( + "tostring(column_ifexists('customDimensions', dynamic({}))['agent']) " + "in ('support', 'sales')" + ) in kql + assert "operation_Id = column_ifexists('operation_Id', '')" in kql + assert "TimeGenerated =" not in kql + assert "| order by timestamp desc" in kql + assert "take 5" in kql + + +def test_build_kql_guards_plain_filter_columns() -> None: + cfg = _config(filters={"name": "agent.response"}) + + kql = build_telemetry_kql(cfg, rows=10) + + assert "tostring(column_ifexists('name', '')) == 'agent.response'" in kql + assert "tostring(name)" not in kql + + +def test_build_kql_rejects_unsafe_filter_field() -> None: + cfg = _config(filters={"name); drop table traces; //": "x"}) + + with pytest.raises(TelemetryImportError, match="unsafe"): + build_telemetry_kql(cfg) + + +def test_find_telemetry_import_reports_available_names() -> None: + cfg = AgentOpsConfig.model_validate( + { + "version": 1, + "agent": "support-agent:1", + "dataset": ".agentops/data/smoke.jsonl", + "telemetry_imports": [ + {"name": "prod", "target": "log-analytics", "workspace_id": "workspace"} + ], + } + ) + + with pytest.raises(TelemetryImportError, match="prod"): + find_telemetry_import(cfg, "missing") + + +def test_query_azure_monitor_reports_missing_sdk(monkeypatch) -> None: + cfg = _config() + original_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if name == "azure.identity": + raise ImportError("no azure") + return original_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + with pytest.raises(TelemetryImportError, match="azure-identity"): + query_azure_monitor(cfg, rows=1)