diff --git a/.codex/environments/environment.toml b/.codex/environments/environment.toml new file mode 100644 index 0000000..a627889 --- /dev/null +++ b/.codex/environments/environment.toml @@ -0,0 +1,6 @@ +# THIS IS AUTOGENERATED. DO NOT EDIT MANUALLY +version = 1 +name = "pipelex-api" + +[setup] +script = "make install" diff --git a/.gitignore b/.gitignore index ae75b69..138fceb 100644 --- a/.gitignore +++ b/.gitignore @@ -51,7 +51,11 @@ temp/ gcp_credentials.json -results/* +.pipelex/storage + +# personnal pipelex config files that overrides the default one +pipelex_super.toml pipelex_override.toml telemetry_override.toml -.pipelex/storage + +.worktreeinclude \ No newline at end of file diff --git a/.pipelex/pipelex.toml b/.pipelex/pipelex.toml index 1455f30..35cc215 100644 --- a/.pipelex/pipelex.toml +++ b/.pipelex/pipelex.toml @@ -203,60 +203,3 @@ traces_dir = ".pipelex/traces" # Configure your own table_name + region in your override file when backend = "dynamodb". table_name = "" region = "" - -#################################################################################################### -# Temporal Config -#################################################################################################### - -[temporal] -# Temporal is opt-in. With Temporal disabled the API runs pipelines in-process -# (POST /v1/execute) and POST /v1/start is unavailable. -# To enable: add a [temporal] section in your pipelex_override.toml with -# is_enabled = true and define a server under [temporal.temporal_config.temporal_server_configs.]. -is_enabled = false - -[temporal.worker_config] -workflow_execution_timeout = "1:00:00" -run_timeout = "1:00:00" -task_timeout = "0:00:10" -start_delay = "0:00:00" -rpc_timeout = "1:00:00" - -[temporal.worker_config.retry_policy_config] -initial_interval = "0:00:03" -backoff_coefficient = 2.0 -maximum_interval = "unlimited" -maximum_attempts = 3 -non_retryable_error_types = [ - "ExtractHandleNotFoundError", - "FileNotFoundError", - "ImgGenHandleNotFoundError", - "LLMHandleNotFoundError", - "ModelNotFoundError", - "ValidationError", -] - -[temporal.temporal_config] -selected_server = "local" - -[temporal.temporal_config.temporal_server_configs.local] -description = "default localhost" -target_host = "localhost:7233" -namespace = "default" -api_key_method = "none" -api_key_id = "" - -[temporal.temporal_config.temporal_log_config] -is_workflow_info_on_message = false -is_workflow_info_on_extra = true -is_full_workflow_info_on_extra = false -is_activity_info_on_message = false -is_activity_info_on_extra = true -is_full_activity_info_on_extra = false -is_formatter_enabled = true -is_prefix_enabled = true -managed_loggers = [ - "temporalio.activity", - "temporalio.workflow", - "temporalio.worker._activity", -] diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f47b76..8d18943 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## [Unreleased] + +Orchestrator-agnostic base. `pipelex-api` no longer hard-wires Temporal: the published base names **no** orchestrator and runs every pipeline in-process, while distributed execution (Temporal, Mistral Workflows, …) becomes a deployment *flavor* = base image + exactly one orchestrator plugin + that plugin's activation. `api/` imports no `pipelex.temporal` / `temporalio`. + +### Added + - **`orchestration_mode` deployment config:** a top-level run's backend is read from a new packaged `api.toml` (`orchestration_mode`, default `direct`; `allow_request_orchestration_mode_override`, default `false`), env-layered like the Pipelex config (`api_{env}.toml` / `api_override.toml`). `orchestration_mode` is an **open string token** (core owns `direct`; each orchestrator plugin owns its own, e.g. `temporal`); an unregistered token fails loud at dispatch. `POST /v1/execute`, `POST /v1/start`, and `POST /v1/validate` may each carry a per-request `orchestration_mode` override, honored only when the deployment opts in — otherwise refused with a `403` (`OrchestrationModeOverrideForbidden`). The **delivery** axis (blocking vs fire-and-forget) is endpoint-intrinsic, never configured or requestable. See [Configuration → Orchestration mode](docs/configuration.md). + - **Orchestrator HTTP-error mappers:** the app discovers each installed orchestrator plugin's transport-fault mapper (via the plugin SPI `add_http_error_mapper`) at construction and registers one RFC 7807 handler per mapped exception type — so a plugin's transport fault renders correctly while the base imports no orchestrator SDK. + +### Changed + - **Two orthogonal axes — `orchestration_mode` (open token) + delivery (endpoint-set):** the backend (*which orchestrator*) and the wait-semantics (*whether the caller waits*) are now distinct. `orchestration_mode` names only the backend and is an open string token. Delivery is `DeliveryMode {BLOCKING, FIRE_AND_FORGET}`, threaded into the orchestrator by the endpoint and never received from a caller: `/execute` and `/validate` dispatch `BLOCKING`, `/start` dispatches `FIRE_AND_FORGET`. The conflated `execution_mode` enum (which carried `temporal_blocking` / `temporal_fire_and_forget` / `mistral_native`) is gone. + - **`/execute` dispatches by `orchestration_mode`:** `POST /v1/execute` selects its backend from the resolved `orchestration_mode` — dispatching the locally-built run job through the hub's `OrchestratorRegistry` with `BLOCKING` delivery, symmetric with `/start` — instead of the boot-global pipe-run slot. `orchestration_mode` is the single source of truth for top-level dispatch (`boot_orchestrator` narrows to the execution stack). A per-request `orchestration_mode` override is gated by the same `403` policy as `/start`. The full output is preserved across the dispatch: the orchestrator's JSON-safe result is rehydrated back into the canonical `PipeOutput` the response wraps. + - **`/start` is HONEST about fire-and-forget:** `POST /v1/start` builds the run job locally (preserving `request_id`, `output_multiplicity`, `dynamic_output_concept_ref`, run registration, and telemetry) and dispatches it through the hub's `OrchestratorRegistry` with `FIRE_AND_FORGET` delivery. It checks the resolved orchestrator's `supports_fire_and_forget` capability **before** loading a library: an async-capable backend (`temporal`, contributed by the `pipelex-temporal` plugin) acks `202` with its `workflow_id`, while the in-process `direct` base is blocking-only and is refused with a `400` (`StartRequiresAsyncOrchestration`) — use `/execute` — instead of silently running blocking and acking. + - **`/validate` dispatches by `orchestration_mode` (verdict-as-value):** like `/start`, `/validate` resolves the deployment's `orchestration_mode` and dispatches through the hub's `BundleValidatorRegistry`, returning the verdict as a value — a 200 valid `PipelexValidationReport` or a 200 invalid `ErrorReport` (no exception-as-control-flow at the route). Validation is inherently blocking, so there is no delivery axis here. `direct` validates **in-process** on the API side; a `temporal` mode dispatches the whole job to a **worker** (contributed by the `pipelex-temporal` plugin) and assembles the same canonical report API-side from the worker's result. A per-request `orchestration_mode` override is gated by the same policy as `/start` (`403` when forbidden). The verdict wire is byte-identical across backends. Size in-process `/validate` traffic for the library load; on a worker-dispatched flavor the library work happens worker-side. + - **Dependencies:** dropped the `temporal` extra from `pipelex` (`pipelex[mistralai,anthropic,google,google-genai,bedrock,fal]`). The base depends on no orchestrator plugin. + ## [v0.5.0] - 2026-06-18 ### Added diff --git a/api/api.toml b/api/api.toml new file mode 100644 index 0000000..4e63531 --- /dev/null +++ b/api/api.toml @@ -0,0 +1,21 @@ +# Pipelex-API deployment config — the packaged default, loaded by `load_api_config()` +# (api/api_config.py) via core's env-aware `load_plugin_config` and validated into `ApiConfig`. +# Keys live at the file root (no `[api]` wrapper) — `load_plugin_config` validates the whole merged +# document against the schema, exactly like `pipelex-temporal`'s `temporal.toml`. The packaged +# default here is deep-merged with an optional `api_{env}.toml` / `api_override.toml` at `~/.pipelex` +# then the project `.pipelex` (env selected by `PIPELEX_ENV`). This open-source base names NO +# orchestrator: it ships the in-process `direct` mode and refuses per-request override. A deployment +# flavor bakes its own `.pipelex/api_{env}.toml` to flip this (e.g. `pipelex-api-hosted` sets +# `orchestration_mode = "temporal"`). + +# Which orchestrator a top-level run dispatches to, through the orchestrator registry. +# `orchestration_mode` is an OPEN string token: core owns "direct" (in-process); each orchestrator +# plugin owns its own ("temporal" from pipelex-temporal, "mistralai-workflows" from +# pipelex-mistralai-workflows). A token whose plugin is not installed fails loud at dispatch with the +# plugin's install hint. The delivery axis (blocking vs fire-and-forget) is NOT configured here — it +# is set by the endpoint (`/execute` and `/validate` block; `/start` is fire-and-forget). +orchestration_mode = "direct" + +# Whether a caller may override `orchestration_mode` per request. Off on the base (and recommended off +# on hosted flavors): a locked-down distributed runner must not be coercible into `direct`. +allow_request_orchestration_mode_override = false diff --git a/api/api_config.py b/api/api_config.py new file mode 100644 index 0000000..04bae00 --- /dev/null +++ b/api/api_config.py @@ -0,0 +1,109 @@ +"""Pipelex-API deployment config: the top-level orchestration mode + override policy. + +The runner is orchestrator-agnostic. WHICH orchestrator a top-level run dispatches +to — ``direct`` in-process (the base default), ``temporal``, ``mistralai-workflows``, +… — is a *deployment* choice, never a property of this open-source base. +``orchestration_mode`` is an open string token (core owns ``"direct"``; each plugin +owns its own); the *delivery* axis (blocking vs fire-and-forget) is endpoint-set, not +configured here. It is read from a packaged ``api.toml`` (keys at the file root — no +``[api]`` wrapper, since :meth:`load_plugin_config` validates the whole document +against the schema, exactly like ``temporal.toml``), env-layered like the main pipelex +config and every plugin config (D2): the packaged default ``api.toml`` (shipped in this +wheel) is deep-merged with the env-selected ``api_{environment}.toml`` and +``api_override.toml`` from ``~/.pipelex`` then the project ``.pipelex``, with +``PIPELEX_ENV`` (``runtime_manager.environment``) choosing the env file. One image +bakes every env file; a deployment flavor (e.g. ``pipelex-api-hosted``) bakes +``.pipelex/api_{env}.toml`` to flip the default. The base names no orchestrator and +ships ``orchestration_mode = "direct"``. + +Why a separate ``api.toml`` and not the core ``pipelex_{env}.toml``: core's +config is ``extra="forbid"``, so an ``[api]`` section there is rejected at load. +Loading it via core's reusable :meth:`load_plugin_config` keeps this a pure +pipelex-api concern while reusing the identical env-layering machinery — and +keeps it symmetric with how ``pipelex-temporal`` self-loads ``temporal.toml``. +""" + +from functools import cache +from pathlib import Path + +from pipelex.system.configuration.config_loader import config_manager +from pydantic import BaseModel, ConfigDict + +from api.error_types import ErrorType +from api.errors import raise_forbidden + +API_CONFIG_NAME = "api" + +# The packaged default ``api.toml`` ships in the wheel alongside this module. +_PACKAGE_DIR = Path(__file__).resolve().parent + + +class ApiConfig(BaseModel): + """The ``[api]`` deployment config: default orchestration mode + override policy. + + No field defaults — the packaged ``api.toml`` is the single source of the + base defaults (mirroring core's "defaults live in the TOML, never in the + model" discipline). ``extra="forbid"`` so a typo'd key in a baked override + fails loud at load instead of being silently ignored. + + ``orchestration_mode`` is an open string token (core owns ``"direct"``; each + plugin owns its own). It is NOT validated against a closed enum here — an + unregistered token is refused at dispatch by ``MissingOrchestratorError``, + the single validation point. The delivery axis (blocking vs fire-and-forget) + is endpoint-set, never configured, so nothing about wait-semantics lives here. + """ + + model_config = ConfigDict(extra="forbid") + + orchestration_mode: str + allow_request_orchestration_mode_override: bool + + +def load_api_config() -> ApiConfig: + """Load the ``[api]`` config from ``api.toml`` with env-aware layering (D2). + + Delegates to core's reusable plugin-config loader: the packaged ``api.toml`` + is deep-merged with the env-selected overrides. The packaged default alone is + a valid, fully-resolved config — every override tier is optional. Requires + Pipelex to be booted (``runtime_manager.environment`` must be resolved), so + it is called only after ``Pipelex.make`` — never at import. + """ + return config_manager.load_plugin_config(name=API_CONFIG_NAME, package_dir=_PACKAGE_DIR, schema=ApiConfig) + + +@cache +def get_api_config() -> ApiConfig: + """Process-cached :class:`ApiConfig`. + + The config is immutable for the life of the process (``PIPELEX_ENV`` is fixed + at boot), so it is loaded once and cached. ``api.main`` warms this at startup + so a malformed ``api.toml`` / baked override fails the app fast — the same + fail-fast posture as ``ERROR_DISCLOSURE``. Tests that need a different mode + patch this getter (or call :func:`resolve_orchestration_mode` with a hand-built + config) rather than mutating the cache. + """ + return load_api_config() + + +def resolve_orchestration_mode(requested: str | None, *, config: ApiConfig) -> str: + """Resolve the effective orchestration mode for a top-level run, applying policy. + + The deployment default (``config.orchestration_mode``) wins unless the caller + supplied a *different* token AND the deployment opted into per-request + override (``allow_request_orchestration_mode_override``). A caller-supplied + token equal to the default is always honored (it changes nothing). A caller + trying to FORCE a different mode on a runner whose policy forbids it is refused + with a 403 — so a locked-down Temporal runner can never be coerced into + ``direct`` (whose whole point would be to bypass distributed execution), and + vice versa. The token is a plain string compare; an *unregistered* token is + not rejected here — that surfaces at dispatch as ``MissingOrchestratorError``. + """ + if requested is None or requested == config.orchestration_mode: + return config.orchestration_mode + if config.allow_request_orchestration_mode_override: + return requested + msg = ( + f"This deployment does not allow overriding orchestration_mode per request " + f"(configured mode '{config.orchestration_mode}', requested '{requested}')." + ) + raise_forbidden(msg, error_type=ErrorType.ORCHESTRATION_MODE_OVERRIDE_FORBIDDEN) diff --git a/api/error_types.py b/api/error_types.py index c69180d..eeda982 100644 --- a/api/error_types.py +++ b/api/error_types.py @@ -17,10 +17,21 @@ class ErrorType(StrEnum): # Authentication / authorization UNAUTHENTICATED = "Unauthenticated" FORBIDDEN = "Forbidden" + # A caller asked to run in an orchestration_mode this deployment forbids overriding + # (per-request override is off — see `allow_request_orchestration_mode_override` in api.toml). + # A 403: the deployment policy refuses to honor the requested mode. + ORCHESTRATION_MODE_OVERRIDE_FORBIDDEN = "OrchestrationModeOverrideForbidden" INVALID_TOKEN = "InvalidToken" TOKEN_EXPIRED = "TokenExpired" SERVER_MISCONFIGURED = "ServerMisconfigured" + # A caller hit `/start` on a deployment whose resolved orchestration mode cannot do genuine + # async (its orchestrator's `supports_fire_and_forget` is False — e.g. the in-process `direct` + # base). `/start` is fire-and-forget by nature, so rather than silently running blocking and + # acking, it refuses HONESTLY with a 400: use `/execute` (synchronous) instead. Checked AFTER + # the override policy, so a forbidden per-request override still 403s first. + START_REQUIRES_ASYNC_ORCHESTRATION = "StartRequiresAsyncOrchestration" + # Request validation BAD_REQUEST = "BadRequest" VALIDATION_ERROR = "ValidationError" @@ -38,7 +49,8 @@ class ErrorType(StrEnum): # Misc PACKAGE_NOT_FOUND = "PackageNotFound" # The `error_type` for the catch-all 500 emitted by `handle_unexpected_error` - # (any failure that is neither a `PipelexError`, a `TemporalError`, nor an - # `ApiError`). Stays in this enum so the same `build_problem_document_from_api_error` + # (any failure matched by no more-specific handler — not an `ApiError`, a + # `RequestValidationError`, a `PipelexError`, or an orchestrator plugin's mapped + # transport exception). Stays in this enum so the same `build_problem_document_from_api_error` # builder renders it — same shape as every other API-authored 500. INTERNAL_SERVER_ERROR = "InternalServerError" diff --git a/api/exception_handlers.py b/api/exception_handlers.py index db6adeb..f0a0fab 100644 --- a/api/exception_handlers.py +++ b/api/exception_handlers.py @@ -5,10 +5,14 @@ (raised by the `api.errors` 4xx/5xx helpers) carries a pre-built problem document; a FastAPI `RequestValidationError` from automatic request validation is rendered into one; every `PipelexError` is turned into a -problem document built from its `ErrorReport`; bare `temporalio` transport -errors get an API-authored classification; everything else collapses to a -sanitized 500. Routes therefore no longer need to catch and shape errors -themselves. +problem document built from its `ErrorReport`; each orchestrator plugin's +transport-fault mapper (contributed through the plugin SPI's +`add_http_error_mapper`, discovered at app construction) gets its own handler +rendering the `ErrorReport` it produces; everything else collapses to a +sanitized 500. The base names no orchestrator and imports no orchestrator SDK +— the Temporal transport classification that used to live here is now owned by +the `pipelex-temporal` plugin and reaches us only as a mapper. Routes +therefore no longer need to catch and shape errors themselves. This module is deliberately import-side-effect-free — no env var reads, no app construction. `api/main.py` calls `register_exception_handlers(app, @@ -23,6 +27,7 @@ import math import re +from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, Any, cast from fastapi import FastAPI, Request, Response @@ -30,16 +35,18 @@ from fastapi.responses import JSONResponse from pipelex import log from pipelex.base_exceptions import DisclosureMode, ErrorDomain, ErrorReport, PipelexError -from temporalio.exceptions import TemporalError +from pipelex.plugins.registrar import HttpErrorMapperFn from api.error_types import ErrorType -from api.error_uri import error_type_uri from api.errors import ApiError from api.problem_document import PROBLEM_JSON_MEDIA_TYPE, build_problem_document, build_problem_document_from_api_error if TYPE_CHECKING: from api.security import RequestUser +# A Starlette/FastAPI async exception handler: `(request, exc) -> response`. +_ExceptionHandler = Callable[[Request, Exception], Awaitable[Response]] + def _request_id_of(request: Request) -> str | None: """Return the correlation id `RequestIdMiddleware` stored on the request. @@ -332,8 +339,8 @@ def _http_status_for(report: ErrorReport) -> int: def _problem_response(report: ErrorReport, *, request: Request, disclosure_mode: DisclosureMode) -> JSONResponse: """Build the RFC 7807 `JSONResponse` for an `ErrorReport` and log the entry. - Shared by the `PipelexError` and `TemporalError` handlers: both produce an - `ErrorReport`, so both render and log it identically. The report is first + Shared by the `PipelexError` handler and every orchestrator-mapper handler: + both produce an `ErrorReport`, so both render and log it identically. The report is first made JSON-safe (see `_json_safe_report`); the status code then comes from `_http_status_for(report)` (the report's own mapping plus any API-layer override registered in ``_ERROR_TYPE_STATUS_OVERRIDES``), and the response @@ -366,44 +373,49 @@ def _problem_response(report: ErrorReport, *, request: Request, disclosure_mode: async def handle_pipelex_error(request: Request, exc: Exception, *, disclosure_mode: DisclosureMode) -> Response: """Translate any pipelex `PipelexError` into an RFC 7807 problem response. - The single place in the API that consumes an `ErrorReport`. + The single place in the API that consumes a `PipelexError`'s `ErrorReport`. `to_error_report()` walks the `__cause__` chain, so a wrapper exception still surfaces the classification of the underlying failure. `exc` is typed `Exception` to match Starlette's handler contract; FastAPI only routes a - `PipelexError` here, so the cast is sound. `WorkflowExecutionError` — a - Temporal workflow failure observed on the submitter side — is a - `PipelexError` (via `TemporalFlowError`) and is unrelated to `temporalio`'s - `TemporalError`, so Starlette's MRO walk resolves it here, never to the - `TemporalError` handler. + `PipelexError` here, so the cast is sound. An orchestrator's workflow + failure that is itself a `PipelexError` (e.g. the Temporal plugin's + `WorkflowExecutionError`, which already carries a structured `ErrorReport`) + resolves here via Starlette's MRO walk; only a *bare* orchestrator-SDK + transport error (no `PipelexError` in its MRO) is routed to that plugin's + own mapper-backed handler instead. """ report = cast("PipelexError", exc).to_error_report() return _problem_response(report, request=request, disclosure_mode=disclosure_mode) -async def handle_temporal_error(request: Request, exc: Exception, *, disclosure_mode: DisclosureMode) -> Response: - """Translate a bare temporalio `TemporalError` into an RFC 7807 problem response. +def _make_orchestrator_error_handler(mapper: HttpErrorMapperFn, *, disclosure_mode: DisclosureMode) -> "_ExceptionHandler": + """Wrap one plugin-contributed error mapper into a FastAPI exception handler. - These are transport-level failures of the Temporal client — cluster - unreachable, an RPC error during workflow dispatch — that surface before - pipelex's `@convert_pipelex_errors` wrapping applies. pipelex deliberately - does not classify them, so the API authors the `ErrorReport` itself: a - retryable, `RUNTIME`-domain `transient` failure. (`WorkflowExecutionError` - IS a `PipelexError`, so it is handled by `handle_pipelex_error` instead.) + The plugin owns the *classification* (it maps its bare transport/runtime + exception — e.g. `temporalio.TemporalError` — to a structured `ErrorReport`); + core owns the *transport* exception type (resolved lazily by the registrar); + the API owns the *presentation* (the same RFC 7807 + `DisclosureMode` + rendering every `ErrorReport` gets via `_problem_response`). This is what + lets the base render an orchestrator's transport fault correctly while + naming — and importing — no orchestrator SDK. `exc` is typed `Exception` to + match Starlette's handler contract; FastAPI only routes the mapper's + registered `exc_type` here. """ - report = ErrorReport( - error_type="TemporalTransportError", - message=str(exc), - title="Temporal transport error", - type_uri=error_type_uri("TemporalTransportError"), - error_category="transient", - error_domain=ErrorDomain.RUNTIME, - retryable=True, - ) - return _problem_response(report, request=request, disclosure_mode=disclosure_mode) + + async def _handler(request: Request, exc: Exception) -> Response: + report = mapper(exc) + return _problem_response(report, request=request, disclosure_mode=disclosure_mode) + + return _handler async def handle_unexpected_error(request: Request, exc: Exception) -> Response: - """Catch-all for any failure that is neither a `PipelexError` nor a `TemporalError`. + """Catch-all for any failure matched by no more-specific handler. + + Covers anything that is not an `ApiError`, a `RequestValidationError`, a + `PipelexError`, or an orchestrator plugin's mapped transport exception — so on + the orchestrator-agnostic base (which installs no mapper), an *unmapped* bare + transport/runtime error collapses here to a sanitized 500. One of the two `except Exception`-equivalent sites the project sanctions — the outermost handler of an API. The response body is fully sanitized: no @@ -522,37 +534,53 @@ async def handle_request_validation_error(request: Request, exc: Exception) -> R return JSONResponse(status_code=422, content=document, media_type=PROBLEM_JSON_MEDIA_TYPE) -def register_exception_handlers(app: FastAPI, *, disclosure_mode: DisclosureMode = DisclosureMode.VERBOSE) -> None: +def register_exception_handlers( + app: FastAPI, + *, + disclosure_mode: DisclosureMode = DisclosureMode.VERBOSE, + http_error_mappers: dict[type[Exception], HttpErrorMapperFn] | None = None, +) -> None: """Register the app-level exception handlers on `app`. Resolution is most-specific-first: an API-authored `ApiError` → `handle_api_error`; a FastAPI `RequestValidationError` (automatic request-body / parameter validation) → `handle_request_validation_error`; a - `PipelexError` (including `WorkflowExecutionError`) → `handle_pipelex_error`; - a non-pipelex `TemporalError` → `handle_temporal_error`; anything else → - `handle_unexpected_error`. Registering `RequestValidationError` overrides - FastAPI's built-in handler so its automatic-validation failures answer in - the same `application/problem+json` shape as every other error, not the + `PipelexError` (including an orchestrator plugin's `PipelexError`-derived + workflow failure) → `handle_pipelex_error`; a bare orchestrator-SDK transport + error → that plugin's mapper-backed handler (see `http_error_mappers`); + anything else → `handle_unexpected_error`. Registering `RequestValidationError` + overrides FastAPI's built-in handler so its automatic-validation failures answer + in the same `application/problem+json` shape as every other error, not the default `{"detail": [...]}`. Shared by the production app and by the unit tests, which register the same handlers on a throwaway app. - `disclosure_mode` is captured by the two handlers that actually render a - pipelex `ErrorReport` (`handle_pipelex_error`, `handle_temporal_error`) via - the thin closures below — production passes the startup-resolved value - (`api.main.ERROR_DISCLOSURE_MODE`); tests pass whatever the test needs and - the default (`VERBOSE`) covers the common case. The other three handlers - don't render an `ErrorReport`, so they don't need the mode and register - directly. + `http_error_mappers` is the `{exc_type: to_error_report}` map an orchestrator + plugin contributes through the plugin SPI (`PluginRegistrar.add_http_error_mapper`, + read back via `get_http_error_mappers`). The caller resolves it at app + construction — `api.main` builds the registrar and passes it; the base resolves + an empty map (it installs no orchestrator plugin), so no transport handler is + registered and the only fallback for an unclassified failure stays the sanitized + 500. Each entry registers one handler that runs the mapper and renders the + resulting `ErrorReport` through the shared RFC 7807 + disclosure path — so core + and the API name no orchestrator SDK. Keeping the parameter explicit (rather than + calling `build_registrar` in here) preserves this module's import-side-effect-free + contract: tests register handlers on a throwaway app without a booted Pipelex, and + a test contributes a synthetic mapper to prove the seam without installing a plugin. + + `disclosure_mode` is captured by the handlers that render a pipelex `ErrorReport` + (`handle_pipelex_error` and every mapper handler) via the closures below — + production passes the startup-resolved value (`api.main.ERROR_DISCLOSURE_MODE`); + tests pass whatever the test needs and the default (`VERBOSE`) covers the common + case. The other three handlers don't render an `ErrorReport`, so they don't need + the mode and register directly. """ async def _pipelex_error(request: Request, exc: Exception) -> Response: return await handle_pipelex_error(request, exc, disclosure_mode=disclosure_mode) - async def _temporal_error(request: Request, exc: Exception) -> Response: - return await handle_temporal_error(request, exc, disclosure_mode=disclosure_mode) - app.add_exception_handler(ApiError, handle_api_error) app.add_exception_handler(RequestValidationError, handle_request_validation_error) app.add_exception_handler(PipelexError, _pipelex_error) - app.add_exception_handler(TemporalError, _temporal_error) + for exc_type, mapper in (http_error_mappers or {}).items(): + app.add_exception_handler(exc_type, _make_orchestrator_error_handler(mapper, disclosure_mode=disclosure_mode)) app.add_exception_handler(Exception, handle_unexpected_error) diff --git a/api/main.py b/api/main.py index c6e5f0d..88aad44 100644 --- a/api/main.py +++ b/api/main.py @@ -15,10 +15,15 @@ from fastapi.middleware.cors import CORSMiddleware from mthds.protocol.protocol import PROTOCOL_VERSION from pipelex.pipelex import Pipelex +from pipelex.plugins.discovery import build_registrar +from pipelex.plugins.registrar import HttpErrorMapperFn +from pipelex.system.configuration.config_loader import config_manager +from pipelex.system.configuration.configs import PipelexConfig from pipelex.system.environment import get_optional_env from pipelex.system.runtime import IntegrationMode from starlette.middleware.base import BaseHTTPMiddleware +from api.api_config import get_api_config from api.disclosure import resolve_disclosure_mode from api.exception_handlers import register_exception_handlers from api.middleware import RequestIdMiddleware, request_body_size_middleware @@ -32,6 +37,13 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None]: Pipelex.make(integration_mode=IntegrationMode.FASTAPI) try: + # Warm (and validate) the [api] deployment config now that Pipelex is booted, so a malformed + # `api.toml` / baked override fails the app at startup rather than on the first /start — the + # same fail-fast posture as ERROR_DISCLOSURE above. Must run after `Pipelex.make` since the + # loader reads `runtime_manager.environment`. Inside the `try` so a raise here still tears the + # singleton down via the `finally` — otherwise `Pipelex.make` leaves a live, ready singleton + # registered and a startup retry / second app in this process fails as "already initialized". + get_api_config() yield finally: Pipelex.teardown_if_needed() @@ -63,6 +75,27 @@ def _resolve_cors_origins() -> tuple[list[str], bool]: ERROR_DISCLOSURE_MODE = resolve_disclosure_mode() +def _resolve_http_error_mappers() -> dict[type[Exception], HttpErrorMapperFn]: + """Resolve the orchestrator plugins' HTTP-error mappers for this deployment. + + Runs the pure, repeatable `build_registrar` against the loaded config (the same + standalone pattern `pipelex plugins list` uses) and reads back the + `{exc_type: to_error_report}` map each installed orchestrator plugin contributed + via `PluginRegistrar.add_http_error_mapper`. The base installs no orchestrator + plugin, so this is an empty map and no transport-error handler is registered; a + flavor (e.g. the Temporal one) contributes its plugin's mapper here, and only at + this point — at app construction, where the plugin (and therefore its SDK) is by + definition installed — is the plugin's exc-type provider thunk run. Resolved once + at module import so a duplicate/broken plugin fails the app fast, mirroring the + `ERROR_DISCLOSURE` fail-fast above. + """ + config = PipelexConfig.model_validate(config_manager.load_config()) + return build_registrar(config=config).get_http_error_mappers() + + +HTTP_ERROR_MAPPERS = _resolve_http_error_mappers() + + def _own_version() -> str: """This server package's version — best-effort for app metadata.""" try: @@ -128,7 +161,7 @@ async def root() -> dict[str, str]: return {"message": "Pipelex API"} -register_exception_handlers(fastapi_app, disclosure_mode=ERROR_DISCLOSURE_MODE) +register_exception_handlers(fastapi_app, disclosure_mode=ERROR_DISCLOSURE_MODE, http_error_mappers=HTTP_ERROR_MAPPERS) # RequestIdMiddleware wraps the *entire* FastAPI app — including Starlette's diff --git a/api/routes/pipelex/pipeline.py b/api/routes/pipelex/pipeline.py index 6fb8f9d..083c69f 100644 --- a/api/routes/pipelex/pipeline.py +++ b/api/routes/pipelex/pipeline.py @@ -11,31 +11,37 @@ from kajson.exceptions import KajsonDecoderError from mthds.protocol.exceptions import PipelineRequestError from pipelex.config import get_config -from pipelex.core.interpreter.interpreter import PipelexInterpreter +from pipelex.core.pipes.pipe_output import PipeOutput +from pipelex.hub import get_bundle_validator_registry, get_orchestrator_registry from pipelex.pipe_run.delivery_assignment import DeliveryAssignment, StorageTarget, WebhookTarget +from pipelex.pipe_run.pipe_run_protocol import PipeRunProtocol from pipelex.pipeline.pipeline_response import PipelexRunResultExecute, PipelexRunResultStart, RunState from pipelex.pipeline.pipeline_run_setup import pipeline_run_setup from pipelex.pipeline.runner import PipelexMTHDSProtocol -from pipelex.pipeline.validate_in_process import validate_bundles_in_process -from pipelex.pipeline.validation_report import PipelexValidationReport, build_validation_report +from pipelex.runtime_bridge.delivery_mode import DeliveryMode +from pipelex.runtime_bridge.exceptions import MissingBundleValidatorError, MissingOrchestratorError +from pipelex.runtime_bridge.primitives.hydration import hydrate_working_memory from pipelex.system.environment import get_required_env -from pipelex.temporal.tprl_pipe.act_dry_validate import DryValidateArg -from pipelex.temporal.tprl_pipe.dry_validate_dispatch import dispatch_dry_validate -from pipelex.temporal.tprl_pipe.temporal_pipe_run import make_temporal_pipe_run from pydantic import ValidationError from typing_extensions import override +from api.api_config import get_api_config, resolve_orchestration_mode from api.error_types import ErrorType -from api.errors import raise_validation_error +from api.errors import raise_bad_request, raise_validation_error from api.logging_context import get_request_id from api.routes.pipelex.utils import get_current_iso_timestamp -from api.schemas.models import PipelexApiStartRequest, PipelineApiExtras, RunRequest +from api.schemas.models import PipelexApiExecuteRequest, PipelexApiStartRequest, PipelineApiExtras, RunRequest if TYPE_CHECKING: from mthds.protocol.pipe_output import VariableMultiplicity from mthds.protocol.pipeline_inputs import PipelineInputs from mthds.protocol.working_memory import WorkingMemoryAbstract + from pipelex.base_exceptions import ErrorReport from pipelex.core.memory.working_memory import WorkingMemory + from pipelex.pipe_run.pipe_job import PipeJob + from pipelex.pipeline.validation_report import PipelexValidationReport + from pipelex.plugins.orchestrator_registry import OrchestratorProtocol + from pipelex.runtime_bridge.payloads import PipelexPipeRunOutput from api.security import RequestUser @@ -64,14 +70,137 @@ def _completion_signature(pipeline_run_id: str) -> str: ).hexdigest() -class ApiRunner(PipelexMTHDSProtocol): - """API runner that extends PipelexMTHDSProtocol with Temporal-backed `start` and `validate`. +def _pipe_output_from_run_output(run_output: PipelexPipeRunOutput) -> PipeOutput: + """Rehydrate an orchestrator's JSON-safe `PipelexPipeRunOutput` into a typed `PipeOutput`. + + The `OrchestratorRegistry` answers with the JSON-safe boundary payload (the same shape + that crosses the Temporal worker boundary), produced by `serialize_completed_output`. + `/execute` returns the FULL output, so the synchronous path reverses that serialization + here, restoring the rich `PipeOutput` the base `execute` then wraps in + `PipelexRunResultExecute`: + + - `working_memory` is rebuilt via `hydrate_working_memory` — the same routine the + Temporal workers use — so it must run while the run library (hence the scoped + `ClassRegistry`) is still open; the base `execute` keeps it open until after the run + returns, which is exactly this call site. + - `graph_spec` / `tokens_usages` are validated back from their `model_dump(mode="json")` + dumps with `strict=False`: the orchestrator dumped them in JSON mode (e.g. + `GraphSpec.created_at` became an ISO string), and those models are `strict=True`, so a + strict re-validation would reject the string. `strict=False` is the correct tool for + reversing our own trusted JSON dump — it is a round-trip, not untrusted ingest. + + Routing DIRECT through this serialize→rehydrate round-trip is intentional: it keeps + `/execute` on the SAME per-call dispatch seam as `/start` and `/validate` (no per-mode + branch), at the cost of one in-process re-serialization of the working memory. + """ + return PipeOutput.model_validate( + { + "working_memory": hydrate_working_memory(run_output.output_dict), + "pipeline_run_id": run_output.pipeline_run_id, + "graph_spec": run_output.graph_spec_dump, + "graph_assembly_error": run_output.graph_assembly_error, + "tokens_usages": run_output.tokens_usages_dump, + "usage_assembly_error": run_output.usage_assembly_error, + }, + strict=False, + ) + + +class _OrchestratorPipeRun(PipeRunProtocol): + """Adapts a mode-selected orchestrator to the `PipeRunProtocol` the base `execute` drives. - Overrides change the BACKEND (in-process vs Temporal dispatch), never the artifact - shapes — every protocol operation answers with the same canonical models as the - local runtime. + `ApiRunner.execute` injects one of these as `self._pipe_run` so the base `execute` retains + ALL of its run lifecycle (library setup/teardown, tracer close, pipeline-manager cleanup, + telemetry, error mapping) while the actual dispatch goes through the per-request + `orchestration_mode`'s orchestrator instead of the boot-global pipe-run slot. The + orchestrator's JSON-safe output is rehydrated back into the rich `PipeOutput` the base expects. + + `delivery` is the endpoint-chosen wait-semantics axis threaded into `orchestrator.run`; + `/execute` always constructs this with `DeliveryMode.BLOCKING` (it returns the full output). """ + def __init__(self, *, orchestrator: OrchestratorProtocol, delivery: DeliveryMode) -> None: + self._orchestrator = orchestrator + self._delivery = delivery + + @override + async def run(self, pipe_job: PipeJob, *, delivery_assignment: DeliveryAssignment | None = None) -> PipeOutput: + run_output = await self._orchestrator.run(pipe_job=pipe_job, delivery_assignment=delivery_assignment, delivery=self._delivery) + return _pipe_output_from_run_output(run_output) + + +class ApiRunner(PipelexMTHDSProtocol): + """API runner that dispatches `execute`, `start`, and `validate` through the per-call plugin registries. + + Every surface resolves the deployment's `orchestration_mode` (config default + optional + per-request override) and dispatches through a per-call hub registry: `execute` runs a + top-level pipe through the `OrchestratorRegistry` with `DeliveryMode.BLOCKING` and returns the + full output, `start` runs one with `DeliveryMode.FIRE_AND_FORGET` through the same registry, + `validate_verdict` produces a validation verdict through the `BundleValidatorRegistry`. On the + orchestrator-agnostic base that means `direct` in-process; a `temporal` mode dispatches to a + worker when `pipelex-temporal` is installed and selected. The base names no orchestrator and + imports no orchestrator SDK; a mode with no registered arm fails loud with the matching + `Missing*Error` (carrying the install hint). Delivery is endpoint-intrinsic — the caller never + chooses it; only the backend is resolved per request. Dispatch changes the BACKEND, never the + artifact shapes — every operation answers with the same canonical models as the local runtime. + """ + + @override + async def execute( + self, + pipe_code: str | None = None, + mthds_contents: list[str] | None = None, + inputs: PipelineInputs | WorkingMemoryAbstract[Any] | None = None, + output_name: str | None = None, + output_multiplicity: VariableMultiplicity | None = None, + dynamic_output_concept_ref: str | None = None, + extra: dict[str, Any] | None = None, + delivery_assignment: DeliveryAssignment | None = None, + requested_orchestration_mode: str | None = None, + ) -> PipelexRunResultExecute: + """Execute a method synchronously, dispatching by the resolved `orchestration_mode`. + + Symmetric with `start`: the effective `orchestration_mode` is resolved FIRST (config default + + optional per-request override, so a forbidden override is refused with a 403 before any + library load), then the run is dispatched through the hub's `OrchestratorRegistry` instead + of the boot-global pipe-run slot. `direct` runs in-process on this agnostic base; a + `temporal` mode dispatches the whole job to a worker and awaits it. A mode with no registered + orchestrator fails loud with `MissingOrchestratorError` (carrying the install hint). + + `/execute` is synchronous — it returns the full output — so it dispatches with + `DeliveryMode.BLOCKING` regardless of backend. Wait-semantics is endpoint-intrinsic, never + requestable, so there is nothing for the caller to get wrong here (the fire-and-forget axis + is `/start`'s, gated there by an honest capability check). + + The orchestrator is injected as this runner's `_pipe_run` so the inherited base `execute` + keeps the entire run lifecycle (library setup/teardown, tracer close, pipeline-manager + cleanup, telemetry, error mapping); only the dispatch backend and the output rehydration + (`_OrchestratorPipeRun`) change. `requested_orchestration_mode` is the optional per-request + backend override (`PipelineApiExtras.orchestration_mode`). + """ + # Resolve the effective orchestration mode FIRST — a per-request override the deployment + # policy forbids is refused (403) here, before any library load / run registration. Mirrors start(). + orchestration_mode = resolve_orchestration_mode(requested_orchestration_mode, config=get_api_config()) + orchestrator = get_orchestrator_registry().get_optional(mode=orchestration_mode) + if orchestrator is None: + raise MissingOrchestratorError(mode=orchestration_mode) + + # Dispatch the run through the mode-selected orchestrator by injecting it as this runner's + # PipeRun, then delegate to the base execute, which owns the full run lifecycle. The + # ApiRunner is constructed per request, so mutating _pipe_run here is request-scoped. + # `/execute` is synchronous, so delivery is BLOCKING. + self._pipe_run = _OrchestratorPipeRun(orchestrator=orchestrator, delivery=DeliveryMode.BLOCKING) + return await super().execute( + pipe_code=pipe_code, + mthds_contents=mthds_contents, + inputs=inputs, + output_name=output_name, + output_multiplicity=output_multiplicity, + dynamic_output_concept_ref=dynamic_output_concept_ref, + extra=extra, + delivery_assignment=delivery_assignment, + ) + @override async def start( self, @@ -85,9 +214,26 @@ async def start( pipeline_run_id: str | None = None, callback_urls: list[str] | None = None, request_id: str | None = None, + requested_orchestration_mode: str | None = None, ) -> PipelexRunResultStart: """Start a method execution asynchronously without waiting for completion. + Dispatch is orchestrator-agnostic: the rich `PipeJob` is built locally (so + `request_id`, `output_multiplicity`, `dynamic_output_concept_ref`, the run + registration, and telemetry all survive) and then handed to the resolved + `orchestration_mode`'s orchestrator with `DeliveryMode.FIRE_AND_FORGET`, through the + hub's `OrchestratorRegistry`. The base imports no `temporalio` / orchestrator SDK; the + async-capable Temporal arm (returning a `workflow_id` immediately) is contributed by the + `pipelex-temporal` plugin when installed. + + `/start` is genuinely fire-and-forget, so it is HONEST about its capability: the resolved + mode's orchestrator is looked up and its `supports_fire_and_forget` checked BEFORE any + library load. A blocking-only orchestrator (the in-process `direct` base) cannot honor + async delivery, so it is refused with a 400 (`START_REQUIRES_ASYNC_ORCHESTRATION`) — use + `/execute` — rather than silently running blocking and acking. A mode with no registered + orchestrator fails loud with `MissingOrchestratorError` (carrying the install hint), also + before any library load. + `pipeline_run_id` is the client-supplied run identifier — this open-source runner honors it (protocol: implementations MAY decline it, but then MUST 422; we accept it, and `StartAck.pipeline_run_id` echoes it back as authoritative). @@ -97,11 +243,28 @@ async def start( extras are parsed by the route layer, so nothing reaches it — a non-empty value is an in-process misuse and is rejected. `request_id` is an API-layer extra threaded into `JobMetadata.request_id` for log - correlation. + correlation. `requested_orchestration_mode` is the optional per-request backend override + (`PipelineApiExtras.orchestration_mode`); it is resolved against the deployment's + `api.toml` policy and a forbidden override is refused with a 403. """ if extra: msg = f"ApiRunner defines no extension args beyond its named ones; got {sorted(extra)}." raise PipelineRequestError(msg) + # Resolve the effective orchestration mode FIRST — a per-request override the deployment + # policy forbids is refused (403) here. Then look up the orchestrator and check its async + # capability: `/start` is fire-and-forget, so a blocking-only orchestrator (direct on the + # agnostic base) is refused HONESTLY with a 400 instead of silently running blocking and + # acking. Both gates run BEFORE pipeline_run_setup so a doomed request never loads a library. + orchestration_mode = resolve_orchestration_mode(requested_orchestration_mode, config=get_api_config()) + orchestrator = get_orchestrator_registry().get_optional(mode=orchestration_mode) + if orchestrator is None: + raise MissingOrchestratorError(mode=orchestration_mode) + if not orchestrator.supports_fire_and_forget: + msg = ( + f"Orchestration mode '{orchestration_mode}' cannot honor fire-and-forget delivery: /start requires an " + f"async-capable orchestration, and this deployment has none. Use /execute (synchronous) instead." + ) + raise_bad_request(msg, error_type=ErrorType.START_REQUIRES_ASYNC_ORCHESTRATION) created_at = get_current_iso_timestamp() pipelex_inputs: PipelineInputs | WorkingMemory | None = cast("PipelineInputs | WorkingMemory | None", inputs) @@ -143,85 +306,70 @@ async def start( else [], ) - temporal_pipe_run = make_temporal_pipe_run() - workflow_id, _handle = await temporal_pipe_run.start( - pipe_job, - delivery_assignment=delivery_assignment, - ) + # Dispatch the locally-built job through the resolved mode's orchestrator (looked up and + # capability-checked above) with FIRE_AND_FORGET delivery — the same final dispatch + # `run_pipe_via_bridge` performs, but fed the rich PipeJob instead of the lossy + # `PipelexPipeRunInput` (which carries no request_id / output_multiplicity / + # dynamic_output_concept_ref and skips run registration + telemetry). The async-capable arm + # enqueues and returns the workflow_id immediately. + run_output = await orchestrator.run(pipe_job=pipe_job, delivery_assignment=delivery_assignment, delivery=DeliveryMode.FIRE_AND_FORGET) return PipelexRunResultStart( pipeline_run_id=resolved_pipeline_run_id, created_at=created_at, state=RunState.STARTED, - workflow_id=workflow_id, + workflow_id=run_output.workflow_id, ) - @override - async def validate( + async def validate_verdict( self, + *, mthds_contents: list[str], - allow_signatures: bool = False, - extra: dict[str, Any] | None = None, - ) -> PipelexValidationReport: - """Validate MTHDS bundles — protocol `validate`, Temporal-aware backend selection. - - `mthds_sources` is the optional per-content source-threading hook, carried through the - protocol's `extra` extension point (`extra={"mthds_sources": [...]}`): each source lands on + mthds_sources: list[str] | None, + allow_signatures: bool, + requested_orchestration_mode: str | None, + ) -> PipelexValidationReport | ErrorReport: + """Validate MTHDS bundles, returning the verdict as a value (the route maps it to a 200). + + Mode-aware, mirroring `start`: the effective `orchestration_mode` is resolved FIRST (config + default + optional per-request override, so a forbidden override is refused with a 403 + before any library load), then dispatched through the hub's `BundleValidatorRegistry`. + `direct` validates in-process on this agnostic base; a `temporal` mode dispatches the + whole job to a worker (`pipelex-temporal`). A mode with no registered validator fails + loud with `MissingBundleValidatorError` (carrying the install hint). Validation is inherently + blocking, so there is no delivery axis here — the registry holds one validator per mode. + + Returns the verdict, not a raise: the valid `PipelexValidationReport`, or the structured + `ErrorReport` an invalid bundle produces (carrying `validation_errors`). A genuine + no-verdict infra fault propagates to the global problem+json handler (5xx). The route + discriminates on `isinstance(verdict, PipelexValidationReport)`. + + `mthds_sources` is the optional per-content source-threading hook: each source lands on the corresponding `blueprint.source`, so the structured `validation_errors` on a failure — and the `bundle_blueprint` on success — carry a real `source` instead of `None`. The route - maps a length mismatch to a 422 before we get here; absent/`None` keeps the prior - sourceless behavior. - - Temporal disabled: run in-process via `validate_bundles_in_process` directly (the same - orchestrator the inherited local path delegates to — `validate_bundle` + graph arm + - `build_validation_report`, one library window — called here so `mthds_sources` rides - through, which `super().validate` does not thread onto the blueprints). - - Temporal enabled: pure dispatch + map (D10) — the whole job (validation sweep, - graph dry-run, and every worker-side artifact: status map, `pending_signatures`, - `pipe_io_contracts`) runs as ONE in-process activity on a worker; this side parses - the blueprints (cheap, no library) and assembles the SAME canonical report via - `build_validation_report` (D14). No API-side library acquisition. - - Either way the result is the canonical `PipelexValidationReport`: a bundle - without a declared `main_pipe` validates fine and simply carries - `graph_spec=None` (D2 — no precondition). + maps a length mismatch to a 422 before we get here; `None` keeps the sourceless behavior. + `library_dirs` is host context the in-process arm needs; a dispatched arm ignores it (the + worker loads its own library). A bundle without a declared `main_pipe` validates fine and + simply carries `graph_spec=None` (D2 — no precondition). """ - mthds_sources: list[str] | None = extra.get("mthds_sources") if extra else None - if not get_config().temporal.is_enabled: - library_dirs = [Path(library_dir) for library_dir in self.library_dirs] if self.library_dirs else None - return await validate_bundles_in_process( - mthds_contents=mthds_contents, - mthds_sources=mthds_sources, - library_dirs=library_dirs, - allow_signatures=allow_signatures, - log_context="API validate", - ) - - # Dispatch FIRST — before any API-side parsing — so every validation failure - # (malformed TOML, factory/wiring errors, strict-mode signature refusals) - # surfaces through the worker's `validate_bundle` cascade with the exact same - # categorized `ValidateBundleError` identity the direct path raises. - dry_validate_result = await dispatch_dry_validate( - DryValidateArg(mthds_contents=mthds_contents, mthds_sources=mthds_sources, allow_signatures=allow_signatures) - ) - - # The bundle is known-valid now — parse the blueprints for the report, threading the - # same per-content sources so the success-path `bundle_blueprint.source` matches the - # failure path. Parsing is pure interpretation (no library), so the worker's single - # library load stays the only one in the whole request. - content_sources: list[str | None] = list(mthds_sources) if mthds_sources is not None else [None] * len(mthds_contents) - blueprints = [ - PipelexInterpreter.make_pipelex_bundle_blueprint(mthds_content=content, mthds_source=source) - for content, source in zip(mthds_contents, content_sources, strict=True) - ] - return build_validation_report( - blueprints=blueprints, - pipe_io_contracts=dry_validate_result.pipe_io_contracts, - dry_run_result=dry_validate_result.dry_run_outputs, - pending_signatures=dry_validate_result.pending_signatures, - graph_spec=dry_validate_result.graph_spec, + # Resolve the effective mode FIRST — a per-request override the deployment policy forbids + # is refused (403) here, before any validator dispatch / library load. Mirrors start(). + orchestration_mode = resolve_orchestration_mode(requested_orchestration_mode, config=get_api_config()) + validator = get_bundle_validator_registry().get_optional(mode=orchestration_mode) + if validator is None: + raise MissingBundleValidatorError(mode=orchestration_mode) + library_dirs = [Path(library_dir) for library_dir in self.library_dirs] if self.library_dirs else None + verdict = await validator.validate_bundles( + mthds_contents=mthds_contents, + mthds_sources=mthds_sources, + allow_signatures=allow_signatures, + library_dirs=library_dirs, ) + # The core seam types its valid arm at the protocol-level ValidationReport (a leaf type) + # to stay import-acyclic in core; every registered validator in fact produces the canonical + # PipelexValidationReport. Recover the precise type here — the single narrowing point — so + # the route's `isinstance(verdict, PipelexValidationReport)` yields ErrorReport on the else arm. + return cast("PipelexValidationReport | ErrorReport", verdict) def _decode_body(body: bytes) -> dict[str, Any]: @@ -266,12 +414,13 @@ def _decode_body(body: bytes) -> dict[str, Any]: def _validate_extras(request_data: dict[str, Any]) -> PipelineApiExtras: - """Validate API-server-only fields (pipeline_run_id, callback_urls).""" + """Validate API-server-only fields (pipeline_run_id, callback_urls, orchestration_mode).""" try: return PipelineApiExtras.model_validate( { "pipeline_run_id": request_data.get("pipeline_run_id"), "callback_urls": request_data.get("callback_urls"), + "orchestration_mode": request_data.get("orchestration_mode"), } ) except ValidationError as exc: @@ -346,7 +495,9 @@ async def _parse_request(request: Request) -> tuple[RunRequest, PipelineApiExtra @router.post( "/execute", response_model=PipelexRunResultExecute, - # The body is read through the raw Request (kajson decoding — see + # Documented body = the protocol's RunRequest plus THIS server's own + # `orchestration_mode` extension (the route honors a per-request override). The + # body is read through the raw Request (kajson decoding — see # `_parse_request`), so FastAPI cannot infer a typed body parameter; # document it explicitly so the committed OpenAPI artifact (and protocol # conformance tooling) publishes the request schema. @@ -354,17 +505,21 @@ async def _parse_request(request: Request) -> tuple[RunRequest, PipelineApiExtra "x-mthds-protocol": True, "requestBody": { "required": True, - "content": {"application/json": {"schema": RunRequest.model_json_schema()}}, + "content": {"application/json": {"schema": PipelexApiExecuteRequest.model_json_schema()}}, }, }, ) async def execute(request: Request) -> JSONResponse: """Execute a method synchronously and return its full output (MTHDS Protocol `POST /execute`). - Pipelex domain failures propagate untouched: the global `PipelexError` - handler in `api.exception_handlers` turns them into an RFC 7807 problem response. + The backend is selected by the resolved `orchestration_mode` (deployment default + optional + policy-gated per-request override via the `orchestration_mode` extra), symmetric with `/start` — + not by `boot_orchestrator`. `/execute` is synchronous, so it dispatches with `BLOCKING` delivery + regardless of backend (wait-semantics is endpoint-set, never requestable). Pipelex domain + failures propagate untouched: the global `PipelexError` handler in `api.exception_handlers` + turns them into an RFC 7807 problem response. """ - run_request, _extras = await _parse_request(request) + run_request, extras = await _parse_request(request) runner = ApiRunner(user_id=_get_user_id(request)) response = await runner.execute( pipe_code=run_request.pipe_code, @@ -373,6 +528,7 @@ async def execute(request: Request) -> JSONResponse: output_name=run_request.output_name, output_multiplicity=run_request.output_multiplicity, dynamic_output_concept_ref=run_request.dynamic_output_concept_ref, + requested_orchestration_mode=extras.orchestration_mode, ) return JSONResponse( content=response.model_dump(mode="json", serialize_as_any=True, by_alias=True), @@ -400,13 +556,22 @@ async def start( request: Request, parsed: Annotated[tuple[RunRequest, PipelineApiExtras], Depends(_parse_request)], ) -> PipelexRunResultStart: - """Start a method asynchronously; returns its pipeline_run_id immediately (MTHDS Protocol `POST /start`). + """Start a method run and return its pipeline_run_id with a 202 ack (MTHDS Protocol `POST /start`). Answers `202 Accepted` with a `StartAck`. A client-supplied `pipeline_run_id` is honored (protocol D11: this runner accepts it; `StartAck.pipeline_run_id` is always authoritative). Pipelex domain failures propagate untouched: the global `PipelexError` handler in `api.exception_handlers` turns them into an RFC 7807 problem response. + + Fire-and-forget is a property of THIS endpoint (its delivery axis), honored only by an + async-capable backend. A deployment configures the backend (`orchestration_mode`) once; `/start` + sets `FIRE_AND_FORGET` delivery and checks the resolved orchestrator can honor it. A Temporal + deployment (`orchestration_mode = "temporal"`) enqueues the run and returns immediately with a + `workflow_id`. On the orchestrator-agnostic base (`orchestration_mode = "direct"`, the default) + the in-process orchestrator is blocking-only, so `/start` is HONEST: it refuses with a `400` + (`StartRequiresAsyncOrchestration`) — use `/execute` — rather than silently blocking and acking. + The completion callback (`callback_urls` / storage delivery) fires on the async path. """ run_request, extras = parsed runner = ApiRunner(user_id=_get_user_id(request)) @@ -420,4 +585,5 @@ async def start( pipeline_run_id=extras.pipeline_run_id, callback_urls=extras.callback_urls, request_id=get_request_id(), + requested_orchestration_mode=extras.orchestration_mode, ) diff --git a/api/routes/pipelex/validate.py b/api/routes/pipelex/validate.py index 9c5effd..9f159e8 100644 --- a/api/routes/pipelex/validate.py +++ b/api/routes/pipelex/validate.py @@ -3,10 +3,8 @@ from fastapi import APIRouter from fastapi.responses import JSONResponse from pipelex.base_exceptions import ErrorReport, ValidationErrorItem -from pipelex.pipeline.exceptions import ValidateBundleError from pipelex.pipeline.validation_render import format_validate_markdown, render_invalid_validation_markdown from pipelex.pipeline.validation_report import PipelexValidationReport -from pipelex.temporal.exceptions import WorkflowExecutionError from pipelex.tools.typing.pydantic_utils import empty_list_factory_of from pipelex.types import StrEnum from pydantic import BaseModel, Field, model_validator @@ -65,6 +63,16 @@ class ValidateRequest(MthdsContentsRequest): "part of the verdict contract); the default empty list renders nothing and the response is unchanged." ), ) + orchestration_mode: str | None = Field( + default=None, + description=( + "Optional per-request orchestration-mode (backend) override for the validation dispatch (same plumbing as " + "`/start`). An OPEN string token: `direct` validates in-process; a `temporal` mode dispatches the whole " + "job to a worker; any plugin-provided token is accepted and an unregistered one is refused at dispatch. " + "Honored only when the deployment sets `allow_request_orchestration_mode_override = true` in its `api.toml`; " + "otherwise a token that differs from the deployment default is refused with a 403. Omitted → the default." + ), + ) @model_validator(mode="after") def _sources_match_contents(self) -> Self: @@ -157,42 +165,31 @@ async def validate_mthds(request_data: ValidateRequest) -> JSONResponse: signatures are reported as `pending_signatures` + `is_runnable: false`, never as an error. - **Invalid verdict (200, `is_valid: false`):** the `InvalidReport` arm — `validation_errors[]` (the structured per-error diagnostics, built by pipelex's one shared builder, incl. the - `dry_run` residual item) + `message`, with the structural artifacts absent. This is what the - route synthesizes by catching the runtime's `ValidateBundleError` (direct mode) and the - Temporal-recovered `WorkflowExecutionError` (whose `to_error_report()` recovers the original - `ValidateBundleError` report) — neither reaches the global handler. + `dry_run` residual item) + `message`, with the structural artifacts absent. The runner + returns this as a value (`ErrorReport`) regardless of backend — the in-process arm from the + bundle's `ValidateBundleError`, the dispatched arm recovered from the worker — so the route + maps it to a 200 by matching the returned verdict, never by catching an exception. - **No verdict (non-2xx):** a malformed request body or an `mthds_sources` length mismatch is a - request-shape **422**; a host-wiring programmer error is a `PipelexUnexpectedError` → **500**; - auth is **401/403**. All are RFC 7807 `application/problem+json` rendered by the global - handler in `api.exception_handlers` — routes never shape them. A genuine Temporal workflow - fault (a `WorkflowExecutionError` that recovers no `ValidateBundleError`) is re-raised here - so it lands as a 5xx, not a verdict. + request-shape **422**; a forbidden `orchestration_mode` override is a **403**; a host-wiring + programmer error or a genuine orchestrator fault is a **5xx**; auth is **401/403**. All are + RFC 7807 `application/problem+json` rendered by the global handler in + `api.exception_handlers` — routes never shape them. """ # Opt-in presentation formats (D-D): resolved once, threaded into both 200 arms. Empty by # default → no `rendered_*` field, response byte-identical to the no-`render` request. requested_formats = _resolve_render_formats(request_data.render) - try: - report = await ApiRunner().validate( - mthds_contents=request_data.mthds_contents, - allow_signatures=request_data.allow_signatures, - # `mthds_sources` rides the protocol's `extra` extension hook (mthds-python 0.5.0 - # generalized the concrete param to `extra: dict | None`). Omitted when absent so the - # sourceless path is unchanged. - extra={"mthds_sources": request_data.mthds_sources} if request_data.mthds_sources is not None else None, - ) - except ValidateBundleError as validation_error: - # Direct backend: an invalid bundle is a produced verdict (200 InvalidReport), not a - # transport failure — intercept it before the global 422 handler. - return _invalid_report_response(validation_error.to_error_report(), requested_formats=requested_formats) - except WorkflowExecutionError as workflow_error: - # Temporal backend: a content verdict crosses the activity boundary as a - # WorkflowExecutionError that recovers the original ValidateBundleError report. A genuine - # workflow fault recovers no such report → re-raise to the global problem+json handler - # (it is a no-verdict server condition, not a verdict the client submitted). - recovered_report = workflow_error.to_error_report() - if recovered_report.error_type != ValidateBundleError.__name__: - raise - return _invalid_report_response(recovered_report, requested_formats=requested_formats) + # Verdict-as-value: the runner resolves the orchestration mode and dispatches through the bundle + # validator registry, returning the verdict. A produced invalid verdict is an `ErrorReport` + # (→ 200 InvalidReport); only a no-verdict fault propagates to the global problem+json handler. + verdict = await ApiRunner().validate_verdict( + mthds_contents=request_data.mthds_contents, + mthds_sources=request_data.mthds_sources, + allow_signatures=request_data.allow_signatures, + requested_orchestration_mode=request_data.orchestration_mode, + ) + if not isinstance(verdict, PipelexValidationReport): + return _invalid_report_response(verdict, requested_formats=requested_formats) + report = verdict # Splat the report's own field/value pairs so a future canonical field rides the wire # automatically — the wrapper never enumerates (and silently drops) report fields. `is_valid` diff --git a/api/schemas/models.py b/api/schemas/models.py index c8c88df..76f126c 100644 --- a/api/schemas/models.py +++ b/api/schemas/models.py @@ -106,6 +106,16 @@ class StartRequest(RunRequest): pipeline_run_id: str | None = Field(default=None, max_length=128) +_ORCHESTRATION_MODE_DESCRIPTION = ( + "PIPELEX-API EXTENSION (not part of the MTHDS Protocol) — request the orchestration mode (the backend) " + "for this run. An OPEN string token: `direct` (in-process, the base default), `temporal`, and any other " + "plugin-provided token are accepted; an unregistered token is refused at dispatch. The delivery axis " + "(blocking vs fire-and-forget) is endpoint-set, never requestable. Honored ONLY when the deployment sets " + "`allow_request_orchestration_mode_override = true` in its `api.toml`; otherwise a token that differs from " + "the deployment default is refused with a 403. Omit it to use the deployment default." +) + + _ALLOWED_CALLBACK_SCHEMES = frozenset({"http", "https"}) @@ -141,6 +151,7 @@ class PipelineApiExtras(BaseModel): pipeline_run_id: str | None = Field(default=None, max_length=128) callback_urls: list[str] | None = Field(default=None, max_length=MAX_CALLBACK_URLS) + orchestration_mode: str | None = Field(default=None, description=_ORCHESTRATION_MODE_DESCRIPTION) @field_validator("callback_urls") @classmethod @@ -181,6 +192,18 @@ class PipelexApiStartRequest(StartRequest): "and cloud-metadata hosts are rejected." ), ) + orchestration_mode: str | None = Field(default=None, description=_ORCHESTRATION_MODE_DESCRIPTION) + + +class PipelexApiExecuteRequest(RunRequest): + """Documented body of `POST /execute` — the protocol's `RunRequest` plus THIS server's `orchestration_mode` extension. + + Used only to publish the OpenAPI request schema: `/execute` reads the body through the raw + `Request` (kajson decoding), so FastAPI cannot infer the body type; this model documents the + per-request `orchestration_mode` override the route actually honors (parsed by `PipelineApiExtras`). + """ + + orchestration_mode: str | None = Field(default=None, description=_ORCHESTRATION_MODE_DESCRIPTION) class MthdsContentsRequest(BaseModel): diff --git a/docs/configuration.md b/docs/configuration.md index 3b9ac77..819b523 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2,9 +2,9 @@ This page covers how to configure the **Docker image** — the env vars it needs at boot, and how to pass your own Pipelex config files into the container. -For the syntax and meaning of Pipelex config itself (storage backends, tracing, inference routing, Temporal, model decks, …), see the official Pipelex documentation: **https://docs.pipelex.com**. This page does not duplicate that. +For the syntax and meaning of Pipelex config itself (storage backends, tracing, inference routing, model decks, …), see the official Pipelex documentation: **https://docs.pipelex.com**. This page does not duplicate that. -> **The official `pipelex/pipelex-api` image is generic.** It boots with Temporal disabled, no S3, no remote tracing, and the Pipelex Gateway as the only enabled inference backend. Anything environment-specific is meant to be supplied by you, on top of the image, via a mounted `.pipelex/` override file. +> **The official `pipelex/pipelex-api` image is generic and orchestrator-agnostic.** It runs every pipeline **in-process** (no distributed orchestrator), with no S3, no remote tracing, and the Pipelex Gateway as the only enabled inference backend. Anything environment-specific is meant to be supplied by you, on top of the image, via a mounted `.pipelex/` override file. Distributed execution (Temporal, Mistral Workflows, …) is **not** built in — it is added by installing exactly one orchestrator plugin on top of this base to produce a deployment *flavor* (see "Execution mode" below). ## Environment variables @@ -101,6 +101,36 @@ In the official Docker image, the `.pipelex/` directory shipped in this reposito For the schema and meaning of every key in these files, see https://docs.pipelex.com. +## Orchestration mode + +The base is **orchestrator-agnostic**. WHICH backend a top-level run dispatches to is a deployment choice, read from a separate **`api.toml`** config file (not the main `pipelex_{env}.toml` — the core config rejects unknown sections). It is layered exactly like the Pipelex config above, but with its own base name: `api.toml` (packaged default) → `api_{PIPELEX_ENV}.toml` → `api_override.toml`. Two keys: + +| Key | Meaning | Base default | +| --- | --- | --- | +| `orchestration_mode` | Which **backend** (orchestrator) a top-level run dispatches to. An **open string token**: `direct` (in-process), `temporal`, and any plugin-provided token are accepted; an unregistered token fails loud at dispatch with the plugin's install hint. | `direct` | +| `allow_request_orchestration_mode_override` | Whether a caller may set `orchestration_mode` per request on `POST /v1/execute`, `POST /v1/start`, and `POST /v1/validate`. When `false`, a requested token that differs from the deployment default is refused with a `403`. | `false` | + +The packaged default (`orchestration_mode = "direct"`, override off) is what the generic image ships. + +**`orchestration_mode` is one of two orthogonal axes.** It names only the **backend**. The other axis — *delivery*, i.e. whether the caller waits — is **endpoint-intrinsic**, never configured and never requestable: + +- `POST /v1/execute` and `POST /v1/validate` are **synchronous** (`BLOCKING` delivery): they return the full output / the verdict. +- `POST /v1/start` is **fire-and-forget** (`FIRE_AND_FORGET` delivery): it returns immediately with a `workflow_id`. It works only on a backend that is genuinely async-capable. A Temporal deployment (`orchestration_mode = "temporal"`) enqueues on `/start` and returns a `workflow_id`. On the orchestrator-agnostic base (`orchestration_mode = "direct"`) the in-process orchestrator is blocking-only, so `/start` is **HONEST**: it refuses with a `400` (`StartRequiresAsyncOrchestration`) — use `/execute` — rather than silently running blocking and acking. + +So a deployment sets **one** `orchestration_mode` and each endpoint applies its own delivery — there is no fire-and-forget token to configure or request. + +**`orchestration_mode` vs `boot_orchestrator` — two knobs, two jobs.** `orchestration_mode` (here) selects the backend a **top-level entry** (`/execute`, `/start`, `/validate`) dispatches to. `boot_orchestrator` (a core Pipelex setting) selects the **execution stack** used wherever a pipe actually runs — on a distributed worker, and for the in-process scoping inside the `direct` orchestrator. On a correctly-configured deployment the two agree (a Temporal flavor sets `orchestration_mode = "temporal"` *and* boots under Temporal); keeping them distinct is what lets `orchestration_mode` be the single source of truth for top-level dispatch without coupling it to how the stack is booted. A `temporal` `orchestration_mode` still requires the process to be booted under Temporal — set them together on a Temporal flavor. + +A **flavor** image (e.g. the hosted Temporal flavor) installs one orchestrator plugin and bakes an `api_{env}.toml` to flip the default, e.g.: + +```toml +# api_prod.toml (keys at the file root — no [api] wrapper) +orchestration_mode = "temporal" # /start is fire-and-forget (async-capable); /execute + /validate stay blocking +allow_request_orchestration_mode_override = false +``` + +Mount your own `api_{env}.toml` / `api_override.toml` into `/root/.pipelex/` exactly like any other override file (see below). + ## Providing your own configuration to Docker Two patterns. Both rely on mounting files into `/root/.pipelex/` inside the container. @@ -195,6 +225,6 @@ API_KEY=your-strong-secret Clients now need `Authorization: Bearer your-strong-secret`. -### Customizing Pipelex (storage, tracing, inference, Temporal, …) +### Customizing Pipelex (storage, tracing, inference, model decks, …) Write a `pipelex_override.toml` (or env-specific `pipelex_.toml`) with the keys you want to change. Reference any provider credentials from env vars via `${VAR}` so they stay out of the file. Mount it into the container as shown above. Refer to https://docs.pipelex.com for the full set of available keys and their semantics. diff --git a/docs/error-responses.md b/docs/error-responses.md index 5b9b854..79c6510 100644 --- a/docs/error-responses.md +++ b/docs/error-responses.md @@ -101,7 +101,7 @@ API-authored errors (`ValidationError`, `BadRequest`, `Unauthenticated`, etc.) f ## Request correlation -Every response carries `X-Request-ID`. The middleware respects an inbound `X-Request-ID` header if present, otherwise generates a UUID. The same id rides through to the Temporal worker via `JobMetadata.request_id` and is bound onto every `WorkflowLog` / `ActivityLog` record produced during the run — so a single id correlates the inbound HTTP call, the API-side log line, and every worker-side log line for the run. +Every response carries `X-Request-ID`. The middleware respects an inbound `X-Request-ID` header if present, otherwise generates a UUID. The same id rides through onto `JobMetadata.request_id`, so it correlates the inbound HTTP call with the API-side log line — and, on a distributed-execution flavor, with every orchestrator worker-side log record produced during the run. When opening an issue, include the `request_id` from the response (or response headers) and the timestamp. diff --git a/docs/index.md b/docs/index.md index 29e9d16..37c3f4a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -71,7 +71,7 @@ The response contains `state: "COMPLETED"` and the result under `pipe_output.wor ### 4. Customize the configuration -Need to disable Temporal, point to a different storage backend, or ship your own model deck? See **[Configuration →](configuration.md)** for how to provide your own `.pipelex/` config files to the Docker image and a quick recipe for running without Temporal. +Need to change the execution mode, point to a different storage backend, or ship your own model deck? See **[Configuration →](configuration.md)** for how to provide your own `.pipelex/` config files to the Docker image. The base runs every pipeline in-process by default; distributed execution (Temporal, …) is added by a deployment flavor, not configured on the base. ## Base URL diff --git a/docs/openapi/pipelex-api.openapi.yaml b/docs/openapi/pipelex-api.openapi.yaml index 0a236e7..e9f06e4 100644 --- a/docs/openapi/pipelex-api.openapi.yaml +++ b/docs/openapi/pipelex-api.openapi.yaml @@ -181,9 +181,17 @@ paths: description: 'Execute a method synchronously and return its full output (MTHDS Protocol `POST /execute`). - Pipelex domain failures propagate untouched: the global `PipelexError` + The backend is selected by the resolved `orchestration_mode` (deployment default + optional + + policy-gated per-request override via the `orchestration_mode` extra), symmetric with `/start` — + + not by `boot_orchestrator`. `/execute` is synchronous, so it dispatches with `BLOCKING` delivery + + regardless of backend (wait-semantics is endpoint-set, never requestable). Pipelex domain - handler in `api.exception_handlers` turns them into an RFC 7807 problem response.' + failures propagate untouched: the global `PipelexError` handler in `api.exception_handlers` + + turns them into an RFC 7807 problem response.' operationId: execute_v1_execute_post requestBody: content: @@ -255,19 +263,29 @@ paths: - type: string - type: 'null' title: Dynamic Output Concept Ref + orchestration_mode: + anyOf: + - type: string + - type: 'null' + title: Orchestration Mode + description: 'PIPELEX-API EXTENSION (not part of the MTHDS Protocol) — request the orchestration mode (the + backend) for this run. An OPEN string token: `direct` (in-process, the base default), `temporal`, and + any other plugin-provided token are accepted; an unregistered token is refused at dispatch. The delivery + axis (blocking vs fire-and-forget) is endpoint-set, never requestable. Honored ONLY when the deployment + sets `allow_request_orchestration_mode_override = true` in its `api.toml`; otherwise a token that differs + from the deployment default is refused with a 403. Omit it to use the deployment default.' additionalProperties: true type: object - title: RunRequest - description: "Body of `POST /execute` — this server's typed request model.\n\nThe MTHDS Protocol has no request\ - \ model (`mthds` deleted `RunRequest`: the\nrequest body is just the basic args the runner already takes as\ - \ named\nparameters). pipelex-api keeps a typed model so it can publish the request\nschema in its OpenAPI\ - \ artifact and parse the body once.\n\nThe declared fields are the protocol's **basic** arguments. The model\ - \ is\ndeliberately open (`extra=\"allow\"`): a caller may send extra request\nproperties (an extension may\ - \ carry the method selector), and they are kept\nrather than silently dropped.\n\nAttributes:\n pipe_code:\ - \ Code of the pipe to execute.\n mthds_contents: List of MTHDS bundle contents to load.\n inputs: Inputs\ - \ in PipelineInputs format — Pydantic validation is skipped\n to preserve the flexible format (dicts,\ - \ strings, StuffContent objects, etc.).\n output_name: Name of the output slot to write to.\n output_multiplicity:\ - \ Output multiplicity setting.\n dynamic_output_concept_ref: Override for the dynamic output concept ref." + title: PipelexApiExecuteRequest + description: 'Documented body of `POST /execute` — the protocol''s `RunRequest` plus THIS server''s `orchestration_mode` + extension. + + + Used only to publish the OpenAPI request schema: `/execute` reads the body through the raw + + `Request` (kajson decoding), so FastAPI cannot infer the body type; this model documents the + + per-request `orchestration_mode` override the route actually honors (parsed by `PipelineApiExtras`).' required: true responses: '200': @@ -282,7 +300,7 @@ paths: tags: - run summary: Start - description: 'Start a method asynchronously; returns its pipeline_run_id immediately (MTHDS Protocol `POST /start`). + description: 'Start a method run and return its pipeline_run_id with a 202 ack (MTHDS Protocol `POST /start`). Answers `202 Accepted` with a `StartAck`. A client-supplied `pipeline_run_id` is @@ -293,7 +311,24 @@ paths: `PipelexError` handler in `api.exception_handlers` turns them into an - RFC 7807 problem response.' + RFC 7807 problem response. + + + Fire-and-forget is a property of THIS endpoint (its delivery axis), honored only by an + + async-capable backend. A deployment configures the backend (`orchestration_mode`) once; `/start` + + sets `FIRE_AND_FORGET` delivery and checks the resolved orchestrator can honor it. A Temporal + + deployment (`orchestration_mode = "temporal"`) enqueues the run and returns immediately with a + + `workflow_id`. On the orchestrator-agnostic base (`orchestration_mode = "direct"`, the default) + + the in-process orchestrator is blocking-only, so `/start` is HONEST: it refuses with a `400` + + (`StartRequiresAsyncOrchestration`) — use `/execute` — rather than silently blocking and acking. + + The completion callback (`callback_urls` / storage delivery) fires on the async path.' operationId: start_v1_start_post requestBody: content: @@ -381,6 +416,17 @@ paths: description: PIPELEX-API EXTENSION (not part of the MTHDS Protocol) — completion webhooks. When the run finishes, the runner POSTs the RunResult to each URL, HMAC-SHA256-signed via the X-Completion-Signature header. http/https only; private, loopback, link-local and cloud-metadata hosts are rejected. + orchestration_mode: + anyOf: + - type: string + - type: 'null' + title: Orchestration Mode + description: 'PIPELEX-API EXTENSION (not part of the MTHDS Protocol) — request the orchestration mode (the + backend) for this run. An OPEN string token: `direct` (in-process, the base default), `temporal`, and + any other plugin-provided token are accepted; an unregistered token is refused at dispatch. The delivery + axis (blocking vs fire-and-forget) is endpoint-set, never requestable. Honored ONLY when the deployment + sets `allow_request_orchestration_mode_override = true` in its `api.toml`; otherwise a token that differs + from the deployment default is refused with a 403. Omit it to use the deployment default.' additionalProperties: true type: object title: PipelexApiStartRequest @@ -416,14 +462,13 @@ paths: \ and carries `graph_spec=null`. Pending\n signatures are reported as `pending_signatures` + `is_runnable: false`,\ \ never as an error.\n- **Invalid verdict (200, `is_valid: false`):** the `InvalidReport` arm — `validation_errors[]`\n\ \ (the structured per-error diagnostics, built by pipelex's one shared builder, incl. the\n `dry_run` residual item)\ - \ + `message`, with the structural artifacts absent. This is what the\n route synthesizes by catching the runtime's\ - \ `ValidateBundleError` (direct mode) and the\n Temporal-recovered `WorkflowExecutionError` (whose `to_error_report()`\ - \ recovers the original\n `ValidateBundleError` report) — neither reaches the global handler.\n- **No verdict (non-2xx):**\ - \ a malformed request body or an `mthds_sources` length mismatch is a\n request-shape **422**; a host-wiring programmer\ - \ error is a `PipelexUnexpectedError` → **500**;\n auth is **401/403**. All are RFC 7807 `application/problem+json`\ - \ rendered by the global\n handler in `api.exception_handlers` — routes never shape them. A genuine Temporal workflow\n\ - \ fault (a `WorkflowExecutionError` that recovers no `ValidateBundleError`) is re-raised here\n so it lands as a\ - \ 5xx, not a verdict." + \ + `message`, with the structural artifacts absent. The runner\n returns this as a value (`ErrorReport`) regardless\ + \ of backend — the in-process arm from the\n bundle's `ValidateBundleError`, the dispatched arm recovered from the\ + \ worker — so the route\n maps it to a 200 by matching the returned verdict, never by catching an exception.\n- **No\ + \ verdict (non-2xx):** a malformed request body or an `mthds_sources` length mismatch is a\n request-shape **422**;\ + \ a forbidden `orchestration_mode` override is a **403**; a host-wiring\n programmer error or a genuine orchestrator\ + \ fault is a **5xx**; auth is **401/403**. All are\n RFC 7807 `application/problem+json` rendered by the global handler\ + \ in\n `api.exception_handlers` — routes never shape them." operationId: validate_mthds_v1_validate_post requestBody: content: @@ -3421,6 +3466,16 @@ components: adds a `rendered_` field (e.g. `rendered_markdown`) to the 200 verdict, on both the valid and invalid arms. Unknown/unsupported tokens are silently ignored (presentation hint, not part of the verdict contract); the default empty list renders nothing and the response is unchanged.' + orchestration_mode: + anyOf: + - type: string + - type: 'null' + title: Orchestration Mode + description: 'Optional per-request orchestration-mode (backend) override for the validation dispatch (same plumbing + as `/start`). An OPEN string token: `direct` validates in-process; a `temporal` mode dispatches the whole job + to a worker; any plugin-provided token is accepted and an unregistered one is refused at dispatch. Honored only + when the deployment sets `allow_request_orchestration_mode_override = true` in its `api.toml`; otherwise a token + that differs from the deployment default is refused with a 403. Omitted → the default.' type: object required: - mthds_contents diff --git a/docs/pipe-run.md b/docs/pipe-run.md index 6aa9ee3..61d16c9 100644 --- a/docs/pipe-run.md +++ b/docs/pipe-run.md @@ -10,6 +10,8 @@ Execute a Pipelex pipeline with flexible inputs and wait for completion. **Endpoint:** `POST /v1/execute` +> **Backend selected by `orchestration_mode`.** `/execute` dispatches through the deployment's `orchestration_mode` (config default + optional policy-gated per-request `orchestration_mode` override), symmetric with `/start` — see [Configuration → Orchestration mode](configuration.md). On the orchestrator-agnostic base (`direct`, the default) it runs **in-process**; a `temporal` flavor dispatches the run to a worker and awaits it. `/execute` is **synchronous** (it returns the full output) and always uses `BLOCKING` delivery — wait-semantics is endpoint-set, never requestable, so there is no fire-and-forget option here (use `POST /v1/start`). A per-request backend override is honored only where the deployment sets `allow_request_orchestration_mode_override = true`; otherwise a token differing from the default is refused with a `403`. + **Request Body:** ```json @@ -74,10 +76,12 @@ Execute a Pipelex pipeline with flexible inputs and wait for completion. ### Start Pipeline -Start a pipeline execution without waiting for completion (non-blocking). +Start a pipeline execution and get its `pipeline_run_id` back with a `202` ack. **Endpoint:** `POST /v1/start` +> **Fire-and-forget is a property of this endpoint, honored only by an async-capable backend.** `orchestration_mode` names only the deployment's backend; `/start` sets `FIRE_AND_FORGET` delivery and requires an orchestrator that can honor it. A Temporal deployment (`orchestration_mode = "temporal"`) enqueues the run and returns immediately with a `workflow_id`. On the orchestrator-agnostic base (`orchestration_mode = "direct"`, the default — see [Configuration → Orchestration mode](configuration.md)) the in-process orchestrator is blocking-only, so `/start` is **HONEST**: it refuses with a `400` (`StartRequiresAsyncOrchestration`) — use `POST /v1/execute` — rather than silently running blocking and acking. The completion callback fires on the async path. + **Request Body:** ```json @@ -138,7 +142,7 @@ Start a pipeline execution without waiting for completion (non-blocking). - `finished_at` (null): Always `null`; the pipeline hasn't completed. - `main_stuff_name` (null): Always `null`; populated only on the eventual completion callback. - `pipe_output` (null): Always `null`; the result isn't ready yet. -- `workflow_id` (string | null): The Temporal workflow ID, when Temporal is enabled. `null` otherwise. +- `workflow_id` (string | null): The async orchestrator's workflow ID. A `202` ack is only ever returned by an async-capable backend (e.g. a Temporal flavor), so this carries that orchestrator's id; the in-process `direct` base never acks here — it returns a `400` (`StartRequiresAsyncOrchestration`) instead. **Errors** follow the same convention as `/execute`: HTTP 4xx/5xx with an [RFC 7807 `application/problem+json`](error-responses.md) body. diff --git a/docs/pipe-validate.md b/docs/pipe-validate.md index c4cf1bf..1e2c4a2 100644 --- a/docs/pipe-validate.md +++ b/docs/pipe-validate.md @@ -98,27 +98,26 @@ The 200 body is one of two arms, discriminated on the mandatory `is_valid` field **What This Endpoint Does:** -The route wraps the runtime's protocol `validate`: parse → load → dry-run-sweep every pipe → build the per-pipe IO contracts → best-effort graph of the `main_pipe` → assemble the canonical report. When the runtime instead raises a `ValidateBundleError` (a bundle the caller can fix), the route converts it to the 200 invalid arm rather than letting it become a transport error. A bundle that declares no `main_pipe` validates normally and simply carries `graph_spec: null` — there is no main-pipe precondition. +The route wraps the runtime's protocol `validate`: parse → load → dry-run-sweep every pipe → build the per-pipe IO contracts → best-effort graph of the `main_pipe` → assemble the canonical report. The runner returns the verdict as a value — the canonical report on the valid arm, or a structured `ErrorReport` (a bundle the caller can fix) on the invalid arm — and the route maps the invalid verdict to the 200 invalid arm by matching the returned value, never by catching a transport error. A bundle that declares no `main_pipe` validates normally and simply carries `graph_spec: null` — there is no main-pipe precondition. **Sourcing submitted files:** The submit path carries bundle text, not file paths, so by default the runtime cannot tell the client which file an error belongs to — `source` comes back `null`. Send `mthds_sources` parallel to `mthds_contents` to fix this: each source is the logical identity of that content (e.g. the file's path relative to the submitted directory), and the runtime threads it onto the corresponding `blueprint.source`. The source then rides back on both arms — `bundle_blueprint.source` on the valid arm, and `validation_errors[].source` on the invalid arm — so a multi-file editor client can map a cross-file diagnostic to the file that owns it. Omit `mthds_sources` (or send `null`) and behavior is exactly as before. The list, when present, must be the same length as `mthds_contents`; a mismatch is a request-shape 422 (it is the caller's wiring bug, caught before the validation sweep runs). -**Execution Backends:** +**Where validation runs:** -The endpoint behaves identically on both deployment backends; only where the work runs differs: +Validation is **`orchestration_mode`-aware**, the same way `/start` is: the runner resolves the effective backend (the deployment default plus the optional per-request `orchestration_mode` override) and dispatches through the bundle-validator registry. Validation is inherently blocking, so there is no delivery axis here — only the backend varies. On the orchestrator-agnostic base — and for `orchestration_mode: direct` — the whole job runs **in-process in one library load on the API side**. On an orchestrator flavor whose mode is selected (e.g. `temporal`), the whole job is **dispatched to a worker** instead, and the API side assembles the same canonical report from the worker's result without loading a library. Either way the verdict is byte-identical: the backend changes, the contract does not. A per-request override the deployment forbids is refused with a 403. -- **Direct (Temporal disabled):** the whole job runs in-process in the API server, one library load. -- **Temporal enabled:** the API dispatches the whole job — validation sweep, graph dry-run, and the worker-side artifacts (`pipe_io_contracts`, `pending_signatures`) — to a worker as **one** in-process activity (`wf_dry_validate` → `act_dry_validate`) and awaits the result in a single round-trip. The API side only parses the blueprints and assembles the same report; it never loads a library. An invalid verdict crosses the boundary as a structured error report (a `WorkflowExecutionError` recovering the original `ValidateBundleError`), which the route detects and renders as the same 200 invalid arm as the direct path. A genuine workflow fault (one that recovers no `ValidateBundleError`) stays a 5xx. +> **Resource note for deployment.** When validation runs in-process (the agnostic base, or `direct` mode), the API server loads the method library to validate, so a deployment that receives large or frequent in-process `/validate` traffic should be sized for that load (memory + CPU for library assembly and the graph dry-run). On a distributed-execution flavor that dispatches validation to a worker, the library work happens worker-side; size the workers accordingly. -The graph remains best-effort on both backends: a bundle that validates but whose graph dry-run fails still returns 200 on the valid arm with `graph_spec: null`. +The graph is best-effort: a bundle that validates but whose graph dry-run fails still returns 200 on the valid arm with `graph_spec: null`. **No-verdict (non-2xx) responses:** Only conditions where the endpoint could not produce a verdict are non-2xx, rendered as [RFC 7807 problem documents](error-responses.md): - **422** — a malformed request body, or an `mthds_sources` / `mthds_contents` length mismatch (a request-shape error caught before the runtime). -- **401 / 403** — unauthenticated / forbidden. -- **5xx** — a server fault (including a host-wiring programmer error, surfaced as `PipelexUnexpectedError`, and a genuine Temporal workflow fault). +- **401 / 403** — unauthenticated / forbidden (including a per-request `orchestration_mode` override the deployment does not allow). +- **5xx** — a server fault (including a host-wiring programmer error, surfaced as `PipelexUnexpectedError`). Read it as one rule: a non-2xx on `/validate` always means "the endpoint could not produce a verdict," never "your bundle is bad." diff --git a/pyproject.toml b/pyproject.toml index c26914d..545ca33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ classifiers = [ ] dependencies = [ - "pipelex[mistralai,anthropic,google,google-genai,bedrock,fal,temporal]==0.35.0", + "pipelex[mistralai,anthropic,google,google-genai,bedrock,fal]==0.35.0", "mthds>=0.5.0", "fastapi>=0.118.0", "pyjwt>=2.10.1", @@ -77,6 +77,15 @@ packages = ["api"] [tool.uv] required-version = ">=0.7.2" +# The orchestration_mode / delivery split (open `orchestration_mode` token + closed +# `DeliveryMode`, per-call OrchestratorRegistry / BundleValidatorRegistry dispatch) is +# developed against the unreleased `pipelex` core in the sibling `../_plugins` worktree. +# Pinned to the pushed core orchestration-mode/delivery split git rev so CI can resolve it +# (an editable local path is unresolvable in CI). For local co-development, repoint this to +# the editable `../_plugins` path; flip to the `==` PyPI pin once the core change ships. +[tool.uv.sources] +pipelex = { git = "https://github.com/Pipelex/pipelex.git", rev = "50ff1f79f1bf38a81cf6f9c16063e32ed51b1e57" } + [tool.mypy] check_untyped_defs = true exclude = "^.*\\.venv/.*$" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 069fdc4..57d01e0 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -4,21 +4,27 @@ from pipelex.test_extras.shared_pytest_plugins import needs_inference_in_pipelex from pytest import FixtureRequest +from api.api_config import get_api_config + @pytest.fixture(autouse=True) def reset_api_config_fixture(request: FixtureRequest): # Code to run before each test print("\n[magenta] Api setup[/magenta]") + # The base runner is orchestrator-agnostic: with no orchestrator plugin installed and no + # `boot_orchestrator` set, every pipeline (incl. dry-run validation) runs DIRECT in-process, + # which is exactly what the hermetic suite needs. The former `temporal_enabled=False` knob is + # gone — Temporal is now an external plugin, absent from this repo's deps entirely. pipelex_instance = Pipelex.make( integration_mode=IntegrationMode.PYTEST, needs_inference=needs_inference_in_pipelex(request), - # Force Temporal off so the suite is hermetic and runs pipelines (incl. dry-run validation) - # in-process. Temporal is opt-in via a gitignored .pipelex/pipelex_override.toml; CI has no - # such override, so this just pins local runs to the CI configuration. Tests that exercise the - # Temporal-backed routes mock the runner, so none need a live server. - temporal_enabled=False, ) + # Drop the process-cached `[api]` config so a test that patches its env / loader cannot leak a + # mutated config into later tests through the `@cache`d `get_api_config()` (the suite otherwise + # relies on the packaged `direct` default — e.g. the `POST /start` override-policy 403 test). + get_api_config.cache_clear() yield # Code to run after each test print("\n[magenta] Api teardown[/magenta]") + get_api_config.cache_clear() pipelex_instance.teardown() diff --git a/tests/unit/test_api_config.py b/tests/unit/test_api_config.py new file mode 100644 index 0000000..72ce509 --- /dev/null +++ b/tests/unit/test_api_config.py @@ -0,0 +1,78 @@ +"""`[api]` deployment config — the default orchestration mode + the per-request override policy. + +The orchestrator-agnostic base reads WHICH backend a top-level run dispatches to from a packaged +`api.toml` (`ApiConfig`), and gates per-request overrides behind a deployment policy. `orchestration_mode` +is an OPEN string token (core owns `"direct"`; each plugin owns its own); the delivery axis is +endpoint-set, never configured. These tests pin the packaged default (`direct`, override off) and the +resolver's policy: the default wins, a caller may only change the backend when the deployment opted in, +and a forbidden override is a 403 — asserted both at the resolver and end-to-end on `POST /start`. +""" + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from api.api_config import ApiConfig, get_api_config, resolve_orchestration_mode +from api.errors import ApiError +from api.exception_handlers import register_exception_handlers +from api.middleware import RequestIdMiddleware +from api.routes.pipelex.pipeline import router as pipeline_router +from tests.unit._constants import VALID_MTHDS + + +def _temporal_locked_config() -> ApiConfig: + """A hosted-style config: the `temporal` backend, override OFF. + + `orchestration_mode` names only the backend; the delivery axis (blocking vs fire-and-forget) is + endpoint-set, never configured, so there is no fire-and-forget token to reject here. + """ + return ApiConfig(orchestration_mode="temporal", allow_request_orchestration_mode_override=False) + + +class TestApiConfigDefault: + def test_packaged_default_is_direct_no_override(self): + # The open-source base names no orchestrator: it ships `direct` and refuses overrides. + config = get_api_config() + assert config.orchestration_mode == "direct" + assert config.allow_request_orchestration_mode_override is False + + +class TestResolveOrchestrationMode: + def test_none_request_uses_deployment_default(self): + assert resolve_orchestration_mode(None, config=_temporal_locked_config()) == "temporal" + + def test_request_equal_to_default_is_honored(self): + # A no-op override (same as the default) is always accepted, override policy or not. + config = _temporal_locked_config() + assert resolve_orchestration_mode("temporal", config=config) == "temporal" + + def test_forbidden_override_is_refused(self): + # A caller must not be able to force `direct` on a locked-down distributed runner. + with pytest.raises(ApiError) as exc_info: + resolve_orchestration_mode("direct", config=_temporal_locked_config()) + assert exc_info.value.status_code == 403 + assert exc_info.value.document["error_type"] == "OrchestrationModeOverrideForbidden" + + def test_allowed_override_is_honored(self): + config = ApiConfig(orchestration_mode="temporal", allow_request_orchestration_mode_override=True) + assert resolve_orchestration_mode("direct", config=config) == "direct" + + +class TestStartOverridePolicyEndToEnd: + def _client(self) -> TestClient: + app = FastAPI() + app.include_router(pipeline_router, prefix="/v1") + register_exception_handlers(app) + return TestClient(RequestIdMiddleware(app)) + + def test_forbidden_per_request_mode_on_start_is_403(self): + # The base config is `direct` with override off, so a caller forcing a different backend on + # `POST /start` is refused with a 403 BEFORE any library load / dispatch — the policy gate + # is the first thing `ApiRunner.start` checks. + response = self._client().post( + "/v1/start", + json={"pipe_code": "echo", "mthds_contents": [VALID_MTHDS], "orchestration_mode": "temporal"}, + ) + assert response.status_code == 403, response.text + assert response.headers["content-type"].startswith("application/problem+json") + assert response.json()["error_type"] == "OrchestrationModeOverrideForbidden" diff --git a/tests/unit/test_exception_handlers.py b/tests/unit/test_exception_handlers.py index e18b645..4da69e5 100644 --- a/tests/unit/test_exception_handlers.py +++ b/tests/unit/test_exception_handlers.py @@ -2,8 +2,10 @@ These tests register the production handlers on a throwaway FastAPI app whose routes raise straight through — `ApiError`, `RequestValidationError`, -`PipelexError`, bare `TemporalError`, and uncaught `Exception` each end up at -their respective handler exactly the way the real app routes them. +`PipelexError`, a bare orchestrator-SDK transport error (handled via a synthetic +plugin-contributed mapper, proving the F3 seam without importing any orchestrator +SDK), and uncaught `Exception` each end up at their respective handler exactly +the way the real app routes them. """ import re @@ -22,13 +24,14 @@ ) from pipelex.cogt.inference.error_classification import ProviderErrorMetadata, UserAction, UserActionKind from pipelex.cogt.inference.provider_name import ProviderName +from pipelex.config import get_config from pipelex.pipe_run.exceptions import AsyncExecutionNotEnabledError from pipelex.pipeline.exceptions import PipelineManagerAlreadyExistsError +from pipelex.plugins.contract import PLUGIN_API_VERSION +from pipelex.plugins.registrar import HttpErrorMapperFn, PluginOrigin, PluginRegistrar from pipelex.system.exceptions import EnvVarNotFoundError -from pipelex.temporal.exceptions import WorkflowExecutionError from pydantic import BaseModel, ConfigDict from pytest_mock import MockerFixture -from temporalio.exceptions import TemporalError from typing_extensions import override from api.error_types import ErrorType @@ -132,8 +135,53 @@ class _CallerFacingInputError(PipelexError): _authors_caller_facing_message = True -class _FakeTemporalTransportError(TemporalError): - """A non-`PipelexError` `TemporalError` subclass — a Temporal transport failure.""" +class _SyntheticTransportError(Exception): + """A bare orchestrator-SDK-style transport error — NOT a `PipelexError`, no `temporalio`. + + Stands in for the kind of exception an orchestrator plugin contributes a mapper for (e.g. + `temporalio.TemporalError`), so the F3 seam — plugin maps a bare transport exception to a + structured `ErrorReport`, the API renders it — is exercised without importing any orchestrator SDK. + """ + + +class _SyntheticOrchestratorError(PipelexError): + """An orchestrator failure that IS a `PipelexError` (carries its own `ErrorReport`). + + Stands in for e.g. the Temporal plugin's `WorkflowExecutionError`: Starlette's MRO walk resolves + it to `handle_pipelex_error`, never to the transport mapper handler registered for a bare + transport exception. + """ + + +def _synthetic_transport_to_report(exc: Exception) -> ErrorReport: + """The mapper an orchestrator plugin would contribute: classify the bare transport fault. + + Mirrors the classification the (now plugin-owned) Temporal transport mapper produces — a + retryable, `RUNTIME`-domain `transient` failure. + """ + return ErrorReport( + error_type="SyntheticTransportError", + message=str(exc), + title="Synthetic transport error", + type_uri="https://docs.pipelex.com/latest/errors/synthetic-transport-error/", + error_category="transient", + error_domain=ErrorDomain.RUNTIME, + retryable=True, + ) + + +def _synthetic_http_error_mappers() -> dict[type[Exception], HttpErrorMapperFn]: + """Resolve a mapper the way an installed orchestrator plugin would — through the registrar SPI. + + Builds a `PluginRegistrar`, has a synthetic plugin contribute the mapper via + `add_http_error_mapper`, and reads it back with `get_http_error_mappers` — exercising the real + producer→consumer seam (the same one `api.main` drives at app construction) without installing a + plugin or importing an orchestrator SDK. + """ + registrar = PluginRegistrar(config=get_config()) + registrar.begin_plugin(name="synthetic-transport-plugin", origin=PluginOrigin.BUILTIN, targets_api=PLUGIN_API_VERSION) + registrar.add_http_error_mapper(exc_type_provider=lambda: _SyntheticTransportError, to_error_report=_synthetic_transport_to_report) + return registrar.get_http_error_mappers() def _report_with_retry(*, status_code: int, retry_after_seconds: float | None) -> ErrorReport: @@ -214,10 +262,10 @@ async def corrupt_error_route() -> None: raise _CorruptReportError(msg) -@_router.get("/workflow-error") -async def workflow_error_route() -> None: +@_router.get("/orchestrator-pipelex-error") +async def orchestrator_pipelex_error_route() -> None: msg = "the workflow failed" - raise WorkflowExecutionError(msg) + raise _SyntheticOrchestratorError(msg) @_router.get("/async-execution-not-enabled") @@ -239,10 +287,10 @@ async def pipeline_run_id_conflict_route() -> None: raise PipelineManagerAlreadyExistsError(msg) -@_router.get("/temporal-transport-error") -async def temporal_transport_error_route() -> None: - msg = "temporal cluster unreachable" - raise _FakeTemporalTransportError(msg) +@_router.get("/synthetic-transport-error") +async def synthetic_transport_error_route() -> None: + msg = "orchestrator cluster unreachable" + raise _SyntheticTransportError(msg) @_router.get("/unexpected-error") @@ -376,7 +424,12 @@ async def run_state_unexpected_error_route(request: Request) -> None: raise RuntimeError(msg) -def _build_client(*, raise_server_exceptions: bool = True, disclosure_mode: DisclosureMode = DisclosureMode.VERBOSE) -> TestClient: +def _build_client( + *, + raise_server_exceptions: bool = True, + disclosure_mode: DisclosureMode = DisclosureMode.VERBOSE, + http_error_mappers: dict[type[Exception], HttpErrorMapperFn] | None = None, +) -> TestClient: """Wire a throwaway app with the production handlers and request-id middleware. `raise_server_exceptions` must be `False` for tests that hit the catch-all @@ -384,11 +437,13 @@ def _build_client(*, raise_server_exceptions: bool = True, disclosure_mode: Disc its handler, so the `TestClient` would otherwise surface the exception instead of the sanitized response. `disclosure_mode` is the value `register_exception_handlers` captures in the closures it registers for - `PipelexError` / `TemporalError`; the default mirrors production + `PipelexError` / the mapper handlers; the default mirrors production (VERBOSE), tests that exercise STRICT redaction pass it explicitly. + `http_error_mappers` are the orchestrator-plugin mappers to register a + transport handler per `exc_type` for — defaults to none (the agnostic base). """ app = FastAPI() - register_exception_handlers(app, disclosure_mode=disclosure_mode) + register_exception_handlers(app, disclosure_mode=disclosure_mode, http_error_mappers=http_error_mappers) app.include_router(_router) return TestClient(RequestIdMiddleware(app), raise_server_exceptions=raise_server_exceptions) @@ -476,24 +531,38 @@ def test_unexpected_error_falls_back_to_sanitized_500(self): assert "RuntimeError" not in response.text assert "something nobody anticipated" not in response.text - def test_temporal_error_dispatch(self): - client = _build_client() - - # WorkflowExecutionError IS a PipelexError — routed to the PipelexError - # handler, so its error_type is the class name, not the transport label. - workflow_response = client.get("/workflow-error") - assert workflow_response.status_code == 500 - assert workflow_response.json()["error_type"] == "WorkflowExecutionError" - - # A bare temporalio TemporalError — routed to the dedicated handler, - # which authors the transport-transient classification. - transport_response = client.get("/temporal-transport-error") + def test_orchestrator_error_dispatch(self): + # The F3 seam: a synthetic plugin contributes a transport mapper through the registrar SPI, + # the app registers one handler per mapped exc_type, and a bare transport error renders the + # plugin's classified ErrorReport — all without importing any orchestrator SDK. + client = _build_client(http_error_mappers=_synthetic_http_error_mappers()) + + # An orchestrator failure that IS a PipelexError — routed to the PipelexError handler (its + # error_type is the class name), NOT the transport mapper handler. + pipelex_response = client.get("/orchestrator-pipelex-error") + assert pipelex_response.status_code == 500 + assert pipelex_response.json()["error_type"] == "_SyntheticOrchestratorError" + + # A bare transport error — routed to the plugin's mapper-backed handler, which renders the + # transport-transient classification the mapper produced. + transport_response = client.get("/synthetic-transport-error") assert transport_response.status_code == 500 transport_body = transport_response.json() - assert transport_body["error_type"] == "TemporalTransportError" + assert transport_body["error_type"] == "SyntheticTransportError" assert transport_body["error_category"] == "transient" assert transport_body["retryable"] is True + def test_unmapped_transport_error_falls_back_to_sanitized_500(self): + # Without the orchestrator plugin's mapper (the agnostic base), the same bare transport error + # has no dedicated handler and collapses to the sanitized catch-all 500 — never leaking its + # class name or message. + response = _build_client(raise_server_exceptions=False).get("/synthetic-transport-error") + assert response.status_code == 500 + body = response.json() + assert body["error_type"] == "InternalServerError" + assert "_SyntheticTransportError" not in response.text + assert "orchestrator cluster unreachable" not in response.text + def test_handler_of_handlers_catches_corrupt_report(self): # When to_error_report() itself raises, the failure escapes the # PipelexError handler; ServerErrorMiddleware funnels it into the @@ -512,8 +581,8 @@ def test_handler_of_handlers_catches_corrupt_report(self): ("GET", "/env-error", None, 500), ("GET", "/input-error", None, 422), ("GET", "/llm-error", None, 429), - ("GET", "/workflow-error", None, 500), - ("GET", "/temporal-transport-error", None, 500), + ("GET", "/orchestrator-pipelex-error", None, 500), + ("GET", "/synthetic-transport-error", None, 500), ("GET", "/corrupt-error", None, 500), ("GET", "/unexpected-error", None, 500), # Cover the API-authored paths (`handle_api_error`) and FastAPI's diff --git a/tests/unit/test_execute_dispatch.py b/tests/unit/test_execute_dispatch.py new file mode 100644 index 0000000..de0e3c3 --- /dev/null +++ b/tests/unit/test_execute_dispatch.py @@ -0,0 +1,183 @@ +"""`/execute` dispatches by orchestration_mode through the OrchestratorRegistry (full synchronous output). + +Pins the dispatch + output-mapping independent of any real backend, with a stub orchestrator: the +runner resolves the deployment's orchestration_mode, dispatches the locally-built PipeJob through the +orchestrator the registry holds for it with `DeliveryMode.BLOCKING`, and rehydrates the orchestrator's +JSON-safe output back into the full PipeOutput the `/execute` response wraps — exercising the real +serialize -> rehydrate round-trip (`serialize_completed_output` -> `hydrate_working_memory`), including +the `graph_spec` `strict=False` re-validation branch. Also pins the policy-gated per-request override +(symmetric with `/start`) and the no-orchestrator case (`MissingOrchestratorError`). The boot slot is +never used — every mode dispatches through the per-call registry. (Delivery is endpoint-set, never +requestable, so `/execute` has no fire-and-forget refusal — that axis is `/start`'s.) +""" + +from datetime import UTC, datetime +from typing import Any + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from pipelex.core.pipes.pipe_output import PipeOutput +from pipelex.graph.graphspec import GraphSpec +from pipelex.pipe_run.delivery_assignment import DeliveryAssignment +from pipelex.pipe_run.pipe_job import PipeJob +from pipelex.plugins.orchestrator_registry import OrchestratorRegistry +from pipelex.runtime_bridge.delivery_mode import DeliveryMode +from pipelex.runtime_bridge.exceptions import MissingOrchestratorError +from pipelex.runtime_bridge.payloads import PipelexPipeRunOutput +from pipelex.runtime_bridge.serialization import serialize_completed_output +from pytest_mock import MockerFixture + +from api.api_config import ApiConfig +from api.exception_handlers import register_exception_handlers +from api.routes import router as api_router +from api.routes.pipelex.pipeline import ApiRunner +from tests.unit._constants import VALID_MTHDS + +_PIPELINE_NS = "api.routes.pipelex.pipeline" + + +class _StubOrchestrator: + """A backend-agnostic stand-in orchestrator: echoes the job's working memory back as completed output. + + Returning via `serialize_completed_output` is the point — it produces the real JSON-safe + `PipelexPipeRunOutput` (the same shape that crosses the Temporal worker boundary), so the route + exercises the production serialize -> rehydrate round-trip instead of a hand-built payload. It + records the `delivery` it was dispatched with so a test can assert `/execute` always passes BLOCKING. + """ + + def __init__(self, *, graph_spec: GraphSpec | None = None, supports_fire_and_forget: bool = False) -> None: + self.calls: list[dict[str, Any]] = [] + self._graph_spec = graph_spec + self.supports_fire_and_forget = supports_fire_and_forget + + async def run(self, *, pipe_job: PipeJob, delivery_assignment: DeliveryAssignment | None, delivery: DeliveryMode) -> PipelexPipeRunOutput: + self.calls.append({"pipe_code": pipe_job.pipe.code, "delivery_assignment": delivery_assignment, "delivery": delivery}) + return serialize_completed_output( + pipe_output=PipeOutput( + working_memory=pipe_job.get_working_memory(), + pipeline_run_id=pipe_job.job_metadata.pipeline_run_id, + graph_spec=self._graph_spec, + ), + workflow_id=None, + ) + + +def _build_client() -> TestClient: + app = FastAPI() + app.include_router(api_router, prefix="/v1") + register_exception_handlers(app) + return TestClient(app) + + +def _register_stub(mocker: MockerFixture, *, mode: str, stub: _StubOrchestrator) -> None: + """Patch the orchestrator registry so the route's mode lookup finds `stub` for `mode`.""" + registry = OrchestratorRegistry({mode: stub}) + mocker.patch(f"{_PIPELINE_NS}.get_orchestrator_registry", return_value=registry) + + +def _force_config(mocker: MockerFixture, *, mode: str, allow_override: bool) -> None: + """Patch the api config so `resolve_orchestration_mode` sees `mode` as the deployment default + policy.""" + config = ApiConfig(orchestration_mode=mode, allow_request_orchestration_mode_override=allow_override) + mocker.patch(f"{_PIPELINE_NS}.get_api_config", return_value=config) + + +class TestExecuteDispatch: + def test_direct_dispatch_returns_rehydrated_full_output(self, mocker: MockerFixture) -> None: + """`direct` (the packaged default) dispatches through the registry and returns the full output.""" + stub = _StubOrchestrator() + _register_stub(mocker, mode="direct", stub=stub) + + client = _build_client() + response = client.post( + "/v1/execute", + json={"pipe_code": "echo", "mthds_contents": [VALID_MTHDS], "inputs": {"text": "hello"}}, + ) + + assert response.status_code == 200, response.text + body = response.json() + assert body["state"] == "COMPLETED" + # The full output survived the serialize -> rehydrate round-trip: the echo input is in the + # rehydrated working memory the /execute response wraps. + root = body["pipe_output"]["working_memory"]["root"] + assert root["text"]["content"]["text"] == "hello" + # The dispatch reached the registered orchestrator; /execute is synchronous, so no delivery + # target and BLOCKING delivery (never the caller's to choose). + assert len(stub.calls) == 1 + assert stub.calls[0]["delivery_assignment"] is None + assert stub.calls[0]["delivery"] is DeliveryMode.BLOCKING + + def test_graph_spec_survives_strict_false_rehydration(self, mocker: MockerFixture) -> None: + """A non-None graph_spec round-trips through the helper's `strict=False` reverse of `model_dump(mode="json")`. + + Pins the most subtle line of `_pipe_output_from_run_output`: the orchestrator dumps `graph_spec` + in JSON mode (so `GraphSpec.created_at`, a `strict=True` datetime, becomes an ISO string), and the + helper must re-validate it with `strict=False` to restore the typed `GraphSpec`. Without the + graph_spec branch exercised, a regression there (strict default, wrong key) would stay green. + """ + graph_spec = GraphSpec(graph_id="g-1", created_at=datetime(2026, 1, 2, 3, 4, 5, tzinfo=UTC)) + stub = _StubOrchestrator(graph_spec=graph_spec) + _register_stub(mocker, mode="direct", stub=stub) + + client = _build_client() + response = client.post( + "/v1/execute", + json={"pipe_code": "echo", "mthds_contents": [VALID_MTHDS], "inputs": {"text": "hello"}}, + ) + + assert response.status_code == 200, response.text + # The graph_spec survived the dump -> strict=False re-validation: it is present in the response. + assert response.json()["pipe_output"]["graph_spec"]["graph_id"] == "g-1" + + def test_per_request_override_honored_when_policy_allows(self, mocker: MockerFixture) -> None: + """With override ON, a per-request orchestration_mode is resolved and dispatched (symmetric with /start).""" + _force_config(mocker, mode="direct", allow_override=True) + stub = _StubOrchestrator() + _register_stub(mocker, mode="temporal", stub=stub) + + client = _build_client() + response = client.post( + "/v1/execute", + json={ + "pipe_code": "echo", + "mthds_contents": [VALID_MTHDS], + "inputs": {"text": "hello"}, + "orchestration_mode": "temporal", + }, + ) + + assert response.status_code == 200, response.text + # The requested (non-default) backend was honored: dispatch reached the temporal-keyed stub. + assert len(stub.calls) == 1 + + def test_forbidden_orchestration_mode_override_is_a_403(self) -> None: + """The route threads orchestration_mode into the same override policy /start uses: a forbidden override is a 403.""" + client = _build_client() + # Packaged default is `direct` with override OFF; forcing a different backend is refused before dispatch. + response = client.post( + "/v1/execute", + json={ + "pipe_code": "echo", + "mthds_contents": [VALID_MTHDS], + "inputs": {"text": "hello"}, + "orchestration_mode": "temporal", + }, + ) + + assert response.status_code == 403, response.text + assert response.headers["content-type"].startswith("application/problem+json") + assert response.json()["error_type"] == "OrchestrationModeOverrideForbidden" + + @pytest.mark.asyncio + async def test_missing_orchestrator_for_resolved_mode_raises(self, mocker: MockerFixture) -> None: + """A resolved mode with no registered orchestrator fails loud with MissingOrchestratorError.""" + mocker.patch(f"{_PIPELINE_NS}.get_orchestrator_registry", return_value=OrchestratorRegistry({})) + + with pytest.raises(MissingOrchestratorError) as exc_info: + await ApiRunner().execute( + pipe_code="echo", + mthds_contents=[VALID_MTHDS], + inputs={"text": "hello"}, + ) + # The packaged default is `direct`; the empty registry holds no orchestrator for it. + assert exc_info.value.mode == "direct" diff --git a/tests/unit/test_orchestration_mode_schema.py b/tests/unit/test_orchestration_mode_schema.py new file mode 100644 index 0000000..aa4be74 --- /dev/null +++ b/tests/unit/test_orchestration_mode_schema.py @@ -0,0 +1,30 @@ +"""The `orchestration_mode` per-request override is documented on every run surface that honors it. + +`/execute` and `/start` both thread a per-request `orchestration_mode` override into the same +override policy (`PipelineApiExtras.orchestration_mode` -> `resolve_orchestration_mode`), so the +committed OpenAPI artifact must advertise the field on BOTH — otherwise a client generated from the +artifact can drive the override on `/execute` but not on `/start`, even though the runtime honors it. +Their bodies are published via inline `openapi_extra` schemas (raw-`Request` parsing, so FastAPI +cannot infer the body), so this asserts the generated `app.openapi()` request-body schema directly. +""" + +import pytest +from fastapi import FastAPI + +from api.routes import router as api_router + + +def _build_app() -> FastAPI: + app = FastAPI() + app.include_router(api_router, prefix="/v1") + return app + + +class TestOrchestrationModeSchema: + @pytest.mark.parametrize("path", ["/v1/execute", "/v1/start"]) + def test_request_body_documents_orchestration_mode(self, path: str) -> None: + schema = _build_app().openapi() + request_schema = schema["paths"][path]["post"]["requestBody"]["content"]["application/json"]["schema"] + properties = request_schema["properties"] + assert "orchestration_mode" in properties, f"{path} request schema must document orchestration_mode" + assert properties["orchestration_mode"]["description"], f"{path} orchestration_mode must carry a description" diff --git a/tests/unit/test_protocol_conformance.py b/tests/unit/test_protocol_conformance.py index a14547b7..c8cde1d 100644 --- a/tests/unit/test_protocol_conformance.py +++ b/tests/unit/test_protocol_conformance.py @@ -12,8 +12,8 @@ - A client-supplied `pipeline_run_id` on `/start` is honored (master D11 — this open-source runner accepts it; `StartAck.pipeline_run_id` echoes it back). - The completion-callback E2E (eng-review 5A): `/start` with `callback_urls` - delivers a signed POST to a local in-test receiver. Temporal is replaced by a - fake whose `start` runs the real `DeliveryExecutor` delivery in-process, so + delivers a signed POST to a local in-test receiver. The orchestrator is replaced + by a fake whose `run` performs the real `DeliveryExecutor` delivery in-process, so the wire bytes (headers + JSON payload) are the production delivery path's. """ @@ -36,6 +36,8 @@ from pipelex.pipe_run.delivery_assignment import DeliveryAssignment, DeliveryStatus from pipelex.pipe_run.delivery_executor import DeliveryExecutor from pipelex.pipeline.pipeline_response import RunState +from pipelex.runtime_bridge.delivery_mode import DeliveryMode +from pipelex.runtime_bridge.payloads import PipelexPipeRunOutput from typing_extensions import override from api.exception_handlers import register_exception_handlers @@ -154,7 +156,7 @@ def test_start_accepts_client_pipeline_run_id_and_delivers_signed_callback(self, and the completion callback reaches the receiver with a valid `X-Completion-Signature` and a payload carrying the protocol `pipeline_run_id`. - Temporal is replaced by a fake whose `start` immediately runs the REAL + The orchestrator is replaced by a fake whose `run` immediately runs the REAL `DeliveryExecutor` delivery against the captured `DeliveryAssignment` (storage skipped — no pipe output), so headers and payload bytes come from the production delivery code path. The SSRF guards are relaxed for @@ -181,8 +183,15 @@ def test_start_accepts_client_pipeline_run_id_and_delivers_signed_callback(self, test_secret = "conformance-shared-callback-secret" mocker.patch.dict(os.environ, {"COMPLETION_CALLBACK_SECRET": test_secret}) - # --- fake Temporal: deliver the completion in-process --------------------- - async def fake_temporal_start(pipe_job: Any, delivery_assignment: DeliveryAssignment) -> tuple[str, None]: + # --- fake orchestrator: deliver the completion in-process ----------------- + # The runner now dispatches the locally-built PipeJob through the hub's + # OrchestratorRegistry. Stand in a fake orchestrator whose `run` performs the REAL + # DeliveryExecutor delivery and returns a fire-and-forget output (workflow_id set, + # is_completed False), so the delivery wire bytes still come from the production path + # without needing a Temporal cluster or the `pipelex-temporal` plugin. + async def fake_run(*, pipe_job: Any, delivery_assignment: DeliveryAssignment, delivery: DeliveryMode) -> PipelexPipeRunOutput: + # `/start` sets the delivery axis itself — it must reach the orchestrator as FIRE_AND_FORGET. + assert delivery is DeliveryMode.FIRE_AND_FORGET await DeliveryExecutor().execute( pipe_output=None, user_id="anonymous", @@ -190,11 +199,21 @@ async def fake_temporal_start(pipe_job: Any, delivery_assignment: DeliveryAssign delivery_assignment=delivery_assignment, status=DeliveryStatus.COMPLETED, ) - return "wf-conformance-1", None + return PipelexPipeRunOutput( + output_dict={}, + pipeline_run_id=pipe_job.job_metadata.pipeline_run_id, + workflow_id="wf-conformance-1", + is_completed=False, + ) - fake_temporal = mocker.MagicMock() - fake_temporal.start = mocker.AsyncMock(side_effect=fake_temporal_start) - mocker.patch("api.routes.pipelex.pipeline.make_temporal_pipe_run", return_value=fake_temporal) + # The fake stands in for an async-capable backend so `/start`'s capability gate passes (it + # checks `supports_fire_and_forget` BEFORE dispatch — a blocking-only orchestrator would 400). + fake_orchestrator = mocker.MagicMock() + fake_orchestrator.supports_fire_and_forget = True + fake_orchestrator.run = mocker.AsyncMock(side_effect=fake_run) + fake_registry = mocker.MagicMock() + fake_registry.get_optional.return_value = fake_orchestrator + mocker.patch("api.routes.pipelex.pipeline.get_orchestrator_registry", return_value=fake_registry) app = FastAPI(redirect_slashes=False) app.include_router(api_router, prefix="/v1") @@ -224,6 +243,13 @@ async def fake_temporal_start(pipe_job: Any, delivery_assignment: DeliveryAssign assert ack["pipeline_run_id"] == client_pipeline_run_id assert ack["state"] == RunState.STARTED + # The runner resolved the deployment's orchestration mode and dispatched under it: with the + # packaged-default config (`direct`) and no per-request override, the registry is asked for the + # `direct` orchestrator by keyword `mode=` — so a regression that resolved/threaded the wrong + # mode would be caught here, not silently dispatched. + fake_registry.get_optional.assert_called_once_with(mode="direct") + fake_orchestrator.run.assert_awaited_once() + # Exactly one delivery reached the receiver. assert len(_CallbackReceiver.captured) == 1 delivery = _CallbackReceiver.captured[0] diff --git a/tests/unit/test_start_dispatch.py b/tests/unit/test_start_dispatch.py new file mode 100644 index 0000000..21fb927 --- /dev/null +++ b/tests/unit/test_start_dispatch.py @@ -0,0 +1,111 @@ +"""`/start` is HONEST about fire-and-forget: it acks only when the resolved backend is async-capable. + +Fire-and-forget vs blocking is the ENDPOINT's delivery axis, never configured: `/execute` and +`/validate` dispatch with `BLOCKING`, while `/start` dispatches with `FIRE_AND_FORGET`. So `/start` +checks the resolved `orchestration_mode`'s orchestrator BEFORE any library load: an async-capable +orchestrator (`supports_fire_and_forget=True`, e.g. `temporal`) acks `202` with its `workflow_id`, +while a blocking-only orchestrator (`supports_fire_and_forget=False`, e.g. the in-process `direct` +base) is refused HONESTLY with a `400` (`StartRequiresAsyncOrchestration`) instead of silently +running blocking and acking. This pins both arms end-to-end on `POST /start`, with a stub orchestrator +registered under the resolved mode (the boot slot is never used). +""" + +from fastapi import FastAPI +from fastapi.testclient import TestClient +from pipelex.pipe_run.delivery_assignment import DeliveryAssignment +from pipelex.pipe_run.pipe_job import PipeJob +from pipelex.plugins.orchestrator_registry import OrchestratorRegistry +from pipelex.runtime_bridge.delivery_mode import DeliveryMode +from pipelex.runtime_bridge.payloads import PipelexPipeRunOutput +from pytest_mock import MockerFixture + +from api.api_config import ApiConfig +from api.exception_handlers import register_exception_handlers +from api.routes import router as api_router +from tests.unit._constants import VALID_MTHDS + +_PIPELINE_NS = "api.routes.pipelex.pipeline" + + +class _RecordingStub: + """Stand-in orchestrator that records its dispatch and returns the configured `workflow_id`. + + `supports_fire_and_forget` is the capability `/start` reads BEFORE dispatch: a True stub models + an async-capable backend (Temporal) that acks immediately with a `workflow_id`; a False stub + models a blocking-only backend (`direct`) that `/start` must refuse honestly. `run` records the + `delivery` it was dispatched with so the async-capable case can assert FIRE_AND_FORGET. + """ + + def __init__(self, *, workflow_id: str | None, supports_fire_and_forget: bool) -> None: + self.calls: list[dict[str, object]] = [] + self._workflow_id = workflow_id + self.supports_fire_and_forget = supports_fire_and_forget + + async def run(self, *, pipe_job: PipeJob, delivery_assignment: DeliveryAssignment | None, delivery: DeliveryMode) -> PipelexPipeRunOutput: + self.calls.append({"delivery_assignment": delivery_assignment, "delivery": delivery}) + return PipelexPipeRunOutput( + output_dict={}, + main_stuff_name=None, + pipeline_run_id=pipe_job.job_metadata.pipeline_run_id, + workflow_id=self._workflow_id, + is_completed=False, + graph_spec_dump=None, + ) + + +def _build_client() -> TestClient: + app = FastAPI() + app.include_router(api_router, prefix="/v1") + register_exception_handlers(app) + return TestClient(app) + + +def _force_config(mocker: MockerFixture, *, mode: str) -> None: + config = ApiConfig(orchestration_mode=mode, allow_request_orchestration_mode_override=False) + mocker.patch(f"{_PIPELINE_NS}.get_api_config", return_value=config) + + +def _register_stub(mocker: MockerFixture, *, mode: str, stub: _RecordingStub) -> None: + mocker.patch(f"{_PIPELINE_NS}.get_orchestrator_registry", return_value=OrchestratorRegistry({mode: stub})) + + +class TestStartCapabilityGate: + """End-to-end: `POST /start` acks only when the resolved backend can honor fire-and-forget.""" + + def test_start_acks_when_orchestrator_is_async_capable(self, mocker: MockerFixture) -> None: + # A `temporal` deployment whose orchestrator is async-capable: /start dispatches FIRE_AND_FORGET + # and acks 202 with the workflow_id the orchestrator returns immediately. + _force_config(mocker, mode="temporal") + stub = _RecordingStub(workflow_id="wf-123", supports_fire_and_forget=True) + _register_stub(mocker, mode="temporal", stub=stub) + + response = _build_client().post( + "/v1/start", + json={"pipe_code": "echo", "mthds_contents": [VALID_MTHDS], "inputs": {"text": "hello"}}, + ) + + assert response.status_code == 202, response.text + body = response.json() + assert body["state"] == "STARTED" + assert body["workflow_id"] == "wf-123" + assert len(stub.calls) == 1 + # /start sets the delivery axis itself — the caller never chooses it. + assert stub.calls[0]["delivery"] is DeliveryMode.FIRE_AND_FORGET + + def test_start_honestly_400s_when_orchestrator_is_blocking_only(self, mocker: MockerFixture) -> None: + # The orchestrator-agnostic base (`direct`) is blocking-only: /start refuses honestly with a + # 400 BEFORE any dispatch (the capability check runs before pipeline_run_setup), so the stub's + # `run` is never awaited — no silent block-and-ack. + _force_config(mocker, mode="direct") + stub = _RecordingStub(workflow_id=None, supports_fire_and_forget=False) + _register_stub(mocker, mode="direct", stub=stub) + + response = _build_client().post( + "/v1/start", + json={"pipe_code": "echo", "mthds_contents": [VALID_MTHDS], "inputs": {"text": "hello"}}, + ) + + assert response.status_code == 400, response.text + assert response.headers["content-type"].startswith("application/problem+json") + assert response.json()["error_type"] == "StartRequiresAsyncOrchestration" + assert stub.calls == [] diff --git a/tests/unit/test_validate_dispatch.py b/tests/unit/test_validate_dispatch.py new file mode 100644 index 0000000..3dbbf4a --- /dev/null +++ b/tests/unit/test_validate_dispatch.py @@ -0,0 +1,144 @@ +"""`/validate` dispatches by orchestration_mode through the BundleValidatorRegistry (verdict-as-value). + +Pins the dispatch+route mapping independent of any real backend, with a stub validator registered +for a non-direct mode (no `temporalio` import): the runner resolves the deployment's orchestration_mode, +dispatches to the validator the registry holds for it, and the route maps the *returned* verdict — +an `ErrorReport` to a 200 `InvalidReport`, a raised fault to a 5xx. Also pins the no-validator case +(`MissingBundleValidatorError`) and that the route threads `orchestration_mode` into the same override +policy `/start` uses (a forbidden override is a 403). The `direct` path is covered end-to-end by the +existing `/validate` suite; this proves the dispatch is backend-agnostic, not direct-only. +""" + +from collections.abc import Sequence +from pathlib import Path +from typing import Any + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from pipelex.base_exceptions import ErrorReport, PipelexConfigError, ValidationErrorCategory, ValidationErrorItem +from pipelex.plugins.bundle_validator_registry import BundleValidatorRegistry +from pipelex.runtime_bridge.exceptions import MissingBundleValidatorError +from pytest_mock import MockerFixture + +from api.api_config import ApiConfig +from api.exception_handlers import register_exception_handlers +from api.routes import router as api_router +from api.routes.pipelex.pipeline import ApiRunner +from tests.unit._constants import VALID_MTHDS + +_PIPELINE_NS = "api.routes.pipelex.pipeline" + + +class _StubBundleValidator: + """A backend-agnostic stand-in validator: records its call, then returns or raises as configured.""" + + def __init__(self, *, verdict: ErrorReport | None = None, error: Exception | None = None) -> None: + self._verdict = verdict + self._error = error + self.calls: list[dict[str, Any]] = [] + + async def validate_bundles( + self, + *, + mthds_contents: list[str], + mthds_sources: list[str] | None, + allow_signatures: bool, + library_dirs: Sequence[Path] | None, + ) -> ErrorReport: + self.calls.append( + { + "mthds_contents": mthds_contents, + "mthds_sources": mthds_sources, + "allow_signatures": allow_signatures, + "library_dirs": library_dirs, + } + ) + if self._error is not None: + raise self._error + assert self._verdict is not None + return self._verdict + + +def _build_client() -> TestClient: + app = FastAPI() + app.include_router(api_router, prefix="/v1") + register_exception_handlers(app) + return TestClient(app) + + +def _register_stub_for_temporal(mocker: MockerFixture, stub: _StubBundleValidator) -> None: + """Make the deployment default to the `temporal` mode and register `stub` for it — no temporalio import. + + Patches the api config (so the real `resolve_orchestration_mode` returns the temporal mode by default) + and the bundle-validator registry (so the route's mode lookup finds the stub). + """ + temporal_config = ApiConfig(orchestration_mode="temporal", allow_request_orchestration_mode_override=False) + mocker.patch(f"{_PIPELINE_NS}.get_api_config", return_value=temporal_config) + registry = BundleValidatorRegistry({"temporal": stub}) + mocker.patch(f"{_PIPELINE_NS}.get_bundle_validator_registry", return_value=registry) + + +class TestValidateDispatch: + def test_non_direct_validators_invalid_verdict_maps_to_200_invalid_report(self, mocker: MockerFixture) -> None: + """A non-direct validator's returned ErrorReport is mapped to a 200 InvalidReport, threading the request.""" + invalid_verdict = ErrorReport( + error_type="ValidateBundleError", + message="bundle is invalid", + title="Validate bundle error", + type_uri="https://errors.pipelex.com/validate-bundle-error/", + validation_errors=[ValidationErrorItem(category=ValidationErrorCategory.BLUEPRINT_VALIDATION, message="bad ref")], + ) + stub = _StubBundleValidator(verdict=invalid_verdict) + _register_stub_for_temporal(mocker, stub) + + client = _build_client() + response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS], "allow_signatures": True}) + + assert response.status_code == 200, response.text + body = response.json() + assert body["is_valid"] is False + assert any(item["category"] == ValidationErrorCategory.BLUEPRINT_VALIDATION for item in body["validation_errors"]) + # The dispatch reached the registered non-direct validator, with the request threaded through. + assert len(stub.calls) == 1 + assert stub.calls[0]["mthds_contents"] == [VALID_MTHDS] + assert stub.calls[0]["allow_signatures"] is True + + def test_validator_fault_propagates_as_5xx(self, mocker: MockerFixture) -> None: + """A validator that raises a genuine fault (no verdict produced) propagates to the global 5xx handler.""" + stub = _StubBundleValidator(error=PipelexConfigError("backend wiring fault")) + _register_stub_for_temporal(mocker, stub) + + client = _build_client() + response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS]}) + + assert response.status_code >= 500, response.text + assert response.headers["content-type"].startswith("application/problem+json") + assert "is_valid" not in response.json() + + @pytest.mark.asyncio + async def test_missing_validator_for_resolved_mode_raises(self, mocker: MockerFixture) -> None: + """A resolved mode with no registered validator fails loud with MissingBundleValidatorError.""" + mocker.patch(f"{_PIPELINE_NS}.get_bundle_validator_registry", return_value=BundleValidatorRegistry({})) + + with pytest.raises(MissingBundleValidatorError) as exc_info: + await ApiRunner().validate_verdict( + mthds_contents=[VALID_MTHDS], + mthds_sources=None, + allow_signatures=False, + requested_orchestration_mode=None, + ) + assert exc_info.value.mode == "direct" + + def test_forbidden_orchestration_mode_override_is_a_403(self) -> None: + """The route threads orchestration_mode into the same override policy /start uses: a forbidden override is a 403.""" + client = _build_client() + # Packaged default is `direct` with override OFF; forcing a different backend is refused before dispatch. + response = client.post( + "/v1/validate", + json={"mthds_contents": [VALID_MTHDS], "orchestration_mode": "temporal"}, + ) + + assert response.status_code == 403, response.text + assert response.headers["content-type"].startswith("application/problem+json") + assert response.json()["error_type"] == "OrchestrationModeOverrideForbidden" diff --git a/tests/unit/test_validate_envelope.py b/tests/unit/test_validate_envelope.py index c5575d7..3a7cbf6 100644 --- a/tests/unit/test_validate_envelope.py +++ b/tests/unit/test_validate_envelope.py @@ -1,28 +1,21 @@ -"""`/validate` envelope contract — the canonical report + wire extras, on both backends. +"""`/validate` envelope contract — the canonical report + wire extras. Phase 2 of the MTHDS Protocol surface alignment: `/validate` routes through `ApiRunner.validate`, so the HTTP envelope is the canonical `PipelexValidationReport` (`bundle_blueprint`, `pipe_io_contracts` keyed by namespaced `pipe_ref`, `validated_pipes`, `pending_signatures` + `is_runnable`, best-effort `graph_spec`) plus this server's wire-only extras (`mthds_contents` echo, `message`). The valid verdict carries `is_valid: true` -(the discriminant of the 200 response union — the `success` extra is retired). The direct backend -runs the real in-process validation; the Temporal backend is exercised by faking the dispatch with -a realistic worker result and asserting the pure dispatch + map contract (D10): same canonical -envelope, zero API-side library acquisition. +(the discriminant of the 200 response union — the `success` extra is retired). Validation runs +DIRECT in-process on the orchestrator-agnostic base (F2): the real in-process validation, with no +orchestrator-backend selection. Includes the D2 regression pin: a bundle that declares no `main_pipe` validates with 200 and -`graph_spec=null` on BOTH backends — the former 422 precondition is deleted. +`graph_spec=null` — the former 422 precondition is deleted. """ -from typing import Any - from fastapi import FastAPI from fastapi.testclient import TestClient -from pipelex.hub import get_library_manager -from pipelex.pipeline.bundle_validator import DryRunOutput, DryRunStatus -from pipelex.pipeline.pipe_io_contracts import IOMultiplicity, PipeInputContract, PipeIOContract, PipeOutputContract -from pipelex.temporal.tprl_pipe.act_dry_validate import DryValidateResult -from pytest_mock import MockerFixture +from pipelex.pipeline.bundle_validator import DryRunStatus from api.exception_handlers import register_exception_handlers from api.routes import router as api_router @@ -36,17 +29,6 @@ def _build_client() -> TestClient: return TestClient(app) -def _enable_fake_temporal_backend(mocker: MockerFixture, worker_result: DryValidateResult) -> Any: - """Flip ApiRunner.validate onto its Temporal arm with a faked worker round-trip.""" - fake_config = mocker.MagicMock() - fake_config.temporal.is_enabled = True - mocker.patch("api.routes.pipelex.pipeline.get_config", return_value=fake_config) - return mocker.patch( - "api.routes.pipelex.pipeline.dispatch_dry_validate", - new=mocker.AsyncMock(return_value=worker_result), - ) - - class TestValidateEnvelope: def test_direct_success_envelope_is_canonical_report_plus_wire_extras(self): client = _build_client() @@ -106,45 +88,3 @@ def test_direct_no_main_pipe_returns_200_without_graph(self): assert body["validated_pipes"] == [{"pipe_ref": "nomain.echo", "status": DryRunStatus.SUCCESS}] assert body["is_runnable"] is True assert body["is_valid"] is True - - def test_temporal_backend_is_pure_dispatch_and_map(self, mocker: MockerFixture): - # D10: everything worker-computed rides DryValidateResult; the API side parses - # blueprints and assembles the SAME canonical envelope — no library acquisition. - # Uses the no-main_pipe bundle so this also pins D2 on the Temporal arm (the old - # route 422'd AFTER the dispatch). - worker_result = DryValidateResult( - dry_run_outputs={"nomain.echo": DryRunOutput(pipe_code="echo", pipe_ref="nomain.echo", status=DryRunStatus.SUCCESS)}, - graph_spec=None, - pending_signatures=[], - pipe_io_contracts={ - "nomain.echo": PipeIOContract( - inputs={"text": PipeInputContract(concept_ref="native.Text", json_schema={"type": "string"})}, - output=PipeOutputContract(concept_ref="native.Text", multiplicity=IOMultiplicity.SINGLE), - ) - }, - ) - dispatch_mock = _enable_fake_temporal_backend(mocker, worker_result) - open_spy = mocker.spy(get_library_manager(), "open_library") - - client = _build_client() - response = client.post("/v1/validate", json={"mthds_contents": [NO_MAIN_PIPE_MTHDS]}) - - assert response.status_code == 200, response.text - body = response.json() - # Same canonical envelope as the direct backend, fed by the worker's artifacts. - assert body["bundle_blueprint"]["domain"] == "nomain" - assert body["pipe_io_contracts"]["nomain.echo"]["inputs"]["text"]["json_schema"] == {"type": "string"} - assert body["validated_pipes"] == [{"pipe_ref": "nomain.echo", "status": DryRunStatus.SUCCESS}] - assert body["pending_signatures"] == [] - assert body["is_runnable"] is True - assert body["graph_spec"] is None - assert body["mthds_contents"] == [NO_MAIN_PIPE_MTHDS] - assert body["is_valid"] is True - - # ONE dispatch carrying the request verbatim... - dispatch_mock.assert_awaited_once() - dispatched_arg = dispatch_mock.await_args.args[0] - assert dispatched_arg.mthds_contents == [NO_MAIN_PIPE_MTHDS] - assert dispatched_arg.allow_signatures is False - # ...and ZERO API-side library loads (D10's point — the worker already had one). - assert open_spy.call_count == 0 diff --git a/tests/unit/test_validate_errors.py b/tests/unit/test_validate_errors.py index 1c4b6e8..1365a91 100644 --- a/tests/unit/test_validate_errors.py +++ b/tests/unit/test_validate_errors.py @@ -6,8 +6,8 @@ maps to per-line problems), built by pipelex's one shared builder; the structural artifacts (`bundle_blueprint`, `pipe_io_contracts`, `graph_spec`, `validated_pipes`) are absent on the invalid arm. Non-2xx is reserved for *no-verdict* conditions: a malformed body or an -`mthds_sources`/`mthds_contents` length mismatch is a request-shape **422**; a genuine Temporal -workflow fault is a **5xx**. +`mthds_sources`/`mthds_contents` length mismatch is a request-shape **422**; a server fault that +is not a produced verdict (a host-wiring `PipelexError` / `PipelexUnexpectedError`) is a **5xx**. When the caller sends `mthds_sources` parallel to `mthds_contents`, each name is threaded onto `blueprint.source`, so pipe/concept and blueprint errors name the owning file on BOTH the invalid @@ -21,21 +21,17 @@ from fastapi import FastAPI from fastapi.testclient import TestClient -from pipelex.base_exceptions import ValidationErrorCategory +from pipelex.base_exceptions import PipelexConfigError, ValidationErrorCategory from pipelex.core.bundles.exceptions import PipelexBundleBlueprintValidationErrorData from pipelex.core.exceptions import PipeFactoryErrorData, PipesAndConceptValidationErrorData from pipelex.core.pipes.exceptions import PipeFactoryErrorType, PipeValidationErrorType -from pipelex.pipeline.bundle_validator import DryRunOutput, DryRunStatus from pipelex.pipeline.exceptions import ValidateBundleError -from pipelex.pipeline.pipe_io_contracts import IOMultiplicity, PipeInputContract, PipeIOContract, PipeOutputContract -from pipelex.temporal.exceptions import WorkflowExecutionError -from pipelex.temporal.tprl_pipe.act_dry_validate import DryValidateResult from pytest_mock import MockerFixture from api.exception_handlers import register_exception_handlers from api.routes import router as api_router from api.routes.pipelex.pipeline import ApiRunner -from tests.unit._constants import INVALID_MAIN_PIPE_MTHDS, NO_MAIN_PIPE_MTHDS, VALID_MTHDS +from tests.unit._constants import INVALID_MAIN_PIPE_MTHDS, VALID_MTHDS # Structural artifacts that exist only on the valid arm — the invalid arm must NOT carry them. _STRUCTURAL_FIELDS = ("bundle_blueprint", "pipe_io_contracts", "graph_spec", "validated_pipes") @@ -136,8 +132,9 @@ def test_validation_errors_carry_threaded_source(self): def test_all_categories_project_onto_invalid_report(self, mocker: MockerFixture): # Every structured category lands on the 200 InvalidReport, and collectively the items cover # the full ValidationErrorItem field set (so a dropped field would fail here, not silently - # vanish at the exception->wire boundary). - mocker.patch.object(ApiRunner, "validate", new=mocker.AsyncMock(side_effect=_multi_category_error())) + # vanish at the verdict->wire boundary). The runner returns the invalid verdict as a value + # (an ErrorReport) — here the ValidateBundleError's own `to_error_report()` projection. + mocker.patch.object(ApiRunner, "validate_verdict", new=mocker.AsyncMock(return_value=_multi_category_error().to_error_report())) client = _build_client() response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS]}) @@ -176,7 +173,7 @@ def test_dry_run_residual_becomes_single_dry_run_item(self, mocker: MockerFixtur # message — the structured-info invariant (never a bare detail with an empty list). It is # graph-level, so it carries no `source`. residual = ValidateBundleError(message="Dry run failed: boom.", dry_run_error_message="Dry run failed: boom.") - mocker.patch.object(ApiRunner, "validate", new=mocker.AsyncMock(side_effect=residual)) + mocker.patch.object(ApiRunner, "validate_verdict", new=mocker.AsyncMock(return_value=residual.to_error_report())) client = _build_client() response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS]}) @@ -191,31 +188,20 @@ def test_dry_run_residual_becomes_single_dry_run_item(self, mocker: MockerFixtur assert dry_run_item["message"] == "Dry run failed: boom." assert "source" not in dry_run_item - def test_temporal_recovered_validation_error_is_200_invalid_report(self, mocker: MockerFixture): - # On the Temporal arm a content verdict crosses the activity boundary as a - # WorkflowExecutionError that recovers the original ValidateBundleError report. The route - # detects the recovered verdict and answers 200 InvalidReport — never a 422/5xx. - recovered_report = _multi_category_error().to_error_report() - workflow_error = WorkflowExecutionError("workflow failed", error_report=recovered_report) - mocker.patch.object(ApiRunner, "validate", new=mocker.AsyncMock(side_effect=workflow_error)) - client = _build_client() - response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS]}) - - assert response.status_code == 200, response.text - body = response.json() - assert body["is_valid"] is False - assert {item["category"] for item in body["validation_errors"]} - - def test_genuine_workflow_fault_is_not_a_verdict(self, mocker: MockerFixture): - # A WorkflowExecutionError that recovers NO ValidateBundleError report is a server fault, not - # a verdict the client submitted — it must propagate to the global problem+json handler - # (non-2xx), never masquerade as a 200 InvalidReport. - mocker.patch.object(ApiRunner, "validate", new=mocker.AsyncMock(side_effect=WorkflowExecutionError("cluster unreachable"))) + def test_non_verdict_failure_is_not_a_200_verdict(self, mocker: MockerFixture): + # The runner returns a produced verdict (valid report | invalid ErrorReport) as a value; only + # a no-verdict fault propagates. A genuine host-wiring/server fault must reach the global + # problem+json handler as a 5xx, never be masqueraded as a 200 verdict. (Route invariant: the + # route maps the returned verdict and lets a raised fault propagate.) + mocker.patch.object(ApiRunner, "validate_verdict", new=mocker.AsyncMock(side_effect=PipelexConfigError("host wiring fault"))) client = _build_client() response = client.post("/v1/validate", json={"mthds_contents": [VALID_MTHDS]}) assert response.status_code >= 500, response.text assert response.headers["content-type"].startswith("application/problem+json") + # Not a verdict body: the discriminated-union arms always carry `is_valid`; a no-verdict + # problem document does not. + assert "is_valid" not in response.json() def test_valid_bundle_threads_source_onto_blueprint(self): client = _build_client() @@ -244,39 +230,3 @@ def test_length_mismatch_is_a_request_error(self): assert response.status_code == 422, response.text assert response.headers["content-type"].startswith("application/problem+json") assert "mthds_sources" in response.text - - def test_temporal_backend_threads_mthds_sources_through_dispatch(self, mocker: MockerFixture): - # Issue 5 on the Temporal arm: the per-content names ride the dispatched DryValidateArg - # AND the API-side blueprint parse threads them onto `bundle_blueprint.source`. - worker_result = DryValidateResult( - dry_run_outputs={"nomain.echo": DryRunOutput(pipe_code="echo", pipe_ref="nomain.echo", status=DryRunStatus.SUCCESS)}, - graph_spec=None, - pending_signatures=[], - pipe_io_contracts={ - "nomain.echo": PipeIOContract( - inputs={"text": PipeInputContract(concept_ref="native.Text", json_schema={"type": "string"})}, - output=PipeOutputContract(concept_ref="native.Text", multiplicity=IOMultiplicity.SINGLE), - ) - }, - ) - fake_config = mocker.MagicMock() - fake_config.temporal.is_enabled = True - mocker.patch("api.routes.pipelex.pipeline.get_config", return_value=fake_config) - dispatch_mock: Any = mocker.patch( - "api.routes.pipelex.pipeline.dispatch_dry_validate", - new=mocker.AsyncMock(return_value=worker_result), - ) - - client = _build_client() - response = client.post( - "/v1/validate", - json={"mthds_contents": [NO_MAIN_PIPE_MTHDS], "mthds_sources": ["api://nomain.mthds"]}, - ) - - assert response.status_code == 200, response.text - body = response.json() - assert body["is_valid"] is True - assert body["bundle_blueprint"]["source"] == "api://nomain.mthds" - dispatch_mock.assert_awaited_once() - dispatched_arg = dispatch_mock.await_args.args[0] - assert dispatched_arg.mthds_sources == ["api://nomain.mthds"] diff --git a/uv.lock b/uv.lock index 091dea6..cc2700b 100644 --- a/uv.lock +++ b/uv.lock @@ -1987,18 +1987,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9e/c9/b2622292ea83fbb4ec318f5b9ab867d0a28ab43c5717bb85b0a5f6b3b0a4/networkx-3.6.1-py3-none-any.whl", hash = "sha256:d47fbf302e7d9cbbb9e2555a0d267983d2aa476bac30e90dfbe5669bd57f3762", size = 2068504, upload-time = "2025-12-08T17:02:38.159Z" }, ] -[[package]] -name = "nexus-rpc" -version = "1.4.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "typing-extensions" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/35/d5/cd1ffb202b76ebc1b33c1332a3416e55a39929006982adc2b1eb069aaa9b/nexus_rpc-1.4.0.tar.gz", hash = "sha256:3b8b373d4865671789cc43623e3dc0bcbf192562e40e13727e17f1c149050fba", size = 82367, upload-time = "2026-02-25T22:01:34.053Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/11/52/6327a5f4fda01207205038a106a99848a41c83e933cd23ea2cab3d2ebc6c/nexus_rpc-1.4.0-py3-none-any.whl", hash = "sha256:14c953d3519113f8ccec533a9efdb6b10c28afef75d11cdd6d422640c40b3a49", size = 29645, upload-time = "2026-02-25T22:01:33.122Z" }, -] - [[package]] name = "nodeenv" version = "1.10.0" @@ -2324,8 +2312,8 @@ wheels = [ [[package]] name = "pipelex" -version = "0.35.0" -source = { registry = "https://pypi.org/simple" } +version = "0.35.1" +source = { git = "https://github.com/Pipelex/pipelex.git?rev=50ff1f79f1bf38a81cf6f9c16063e32ed51b1e57#50ff1f79f1bf38a81cf6f9c16063e32ed51b1e57" } dependencies = [ { name = "aiofiles" }, { name = "datamodel-code-generator" }, @@ -2364,10 +2352,6 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0e/b2/1d2efb7c17e5b7a1c3423fe6e4c81e8a3a364212a04dd8a8b05aafc64d13/pipelex-0.35.0.tar.gz", hash = "sha256:27bffd7a570613039a970dcaa6b79319ef1244cea3180bd750167cfec0537f92", size = 1030986, upload-time = "2026-06-18T21:40:49.787Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/63/ec/66339936126225e3aa9c919dd3c0dc994122c402261e48851942fb718daf/pipelex-0.35.0-py3-none-any.whl", hash = "sha256:8e74510fd8040fbf269811803a8cbd6452cafba6c2acb7743904c5004cb6938d", size = 1464272, upload-time = "2026-06-18T21:40:47.683Z" }, -] [package.optional-dependencies] anthropic = [ @@ -2392,10 +2376,6 @@ google-genai = [ mistralai = [ { name = "mistralai" }, ] -temporal = [ - { name = "aiohttp" }, - { name = "temporalio" }, -] [[package]] name = "pipelex-api" @@ -2404,7 +2384,7 @@ source = { editable = "." } dependencies = [ { name = "fastapi" }, { name = "mthds" }, - { name = "pipelex", extra = ["anthropic", "bedrock", "fal", "google", "google-genai", "mistralai", "temporal"] }, + { name = "pipelex", extra = ["anthropic", "bedrock", "fal", "google", "google-genai", "mistralai"] }, { name = "pyjwt" }, { name = "uvicorn" }, ] @@ -2453,7 +2433,7 @@ requires-dist = [ { name = "mthds", specifier = ">=0.5.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.11.2" }, { name = "pandas-stubs", marker = "extra == 'dev'", specifier = ">=2.2.3.241126" }, - { name = "pipelex", extras = ["mistralai", "anthropic", "google", "google-genai", "bedrock", "fal", "temporal"], specifier = "==0.35.0" }, + { name = "pipelex", extras = ["mistralai", "anthropic", "google", "google-genai", "bedrock", "fal"], git = "https://github.com/Pipelex/pipelex.git?rev=50ff1f79f1bf38a81cf6f9c16063e32ed51b1e57" }, { name = "pyjwt", specifier = ">=2.10.1" }, { name = "pylint", marker = "extra == 'dev'", specifier = ">=3.3.8" }, { name = "pyright", marker = "extra == 'dev'", specifier = ">=1.1.405" }, @@ -3251,25 +3231,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/16/42/56d31c5ee52dab0ad893d67d4f9c00f5ba2b4c5d87f392eca2c3fdce01cf/starlette-1.3.0-py3-none-any.whl", hash = "sha256:ff4ca1bc23de6a45cdfbbeb9b3caaea524c9221cdd8a6684ad7a4f651a83890b", size = 73492, upload-time = "2026-06-11T06:27:40.444Z" }, ] -[[package]] -name = "temporalio" -version = "1.24.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "nexus-rpc" }, - { name = "protobuf" }, - { name = "types-protobuf" }, - { name = "typing-extensions" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/a7/b1/7d9b3104ab7994e7d49e765b92495aaff44810b1e066c874c284a93ebd55/temporalio-1.24.0.tar.gz", hash = "sha256:e534e2e71b4a721193ec4ff3dae521146d093554bd47a64f5605d4ca33e56718", size = 2040485, upload-time = "2026-03-23T15:33:33.638Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/84/a9/30517c21d6155bce1c3dc0e420db48da0231230dbc683f40ab6d5fe22b37/temporalio-1.24.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:7f11e7b4f4d09bafba499b43188353e23dc128b1fe3f3160014476e3dce70760", size = 12223918, upload-time = "2026-03-23T15:33:05.045Z" }, - { url = "https://files.pythonhosted.org/packages/73/d0/11aa103bde794524008c1850a84e06cde98698395ca1f8b12e1bd2390aa8/temporalio-1.24.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:5cff75a0ca922575b808a7fca1b0de38f6eea061f49e026664b8be9d5bb06ab8", size = 11708887, upload-time = "2026-03-23T15:33:11.67Z" }, - { url = "https://files.pythonhosted.org/packages/1d/f4/774b56100e6bb94e3757ec96fb5c2bc62d42defc7d6de0ee35a12273827a/temporalio-1.24.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee7c13b6724dd0c304aa846aecf6da72a8550f4ade40a0a7f6dcc1c92ef35710", size = 12028303, upload-time = "2026-03-23T15:33:18.022Z" }, - { url = "https://files.pythonhosted.org/packages/e5/91/c05d0e9c2432fe8b1ea0d6fae321866ee49a320ad5e494e6ec9424ca5c28/temporalio-1.24.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa71b9bfa42f951dd04ade97ce7f92ecedee8903047b4b41b122bb8cbd87a337", size = 12375155, upload-time = "2026-03-23T15:33:24.234Z" }, - { url = "https://files.pythonhosted.org/packages/c4/97/5c939e4609c164c8690a3b5a135eb828d531de8ef63ff447a2a439c0b0fb/temporalio-1.24.0-cp310-abi3-win_amd64.whl", hash = "sha256:52f6833647eceddbebcc376e2ea663a9f73b2b3a42675f503aeb27c98fd4daeb", size = 12720174, upload-time = "2026-03-23T15:33:30.826Z" }, -] - [[package]] name = "tenacity" version = "9.1.4" @@ -3531,15 +3492,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/96/0e/d745ce95fc74e34df802010fd0387e33db468179e6ff42b708280ab268c7/types_openpyxl-3.1.5.20260518-py3-none-any.whl", hash = "sha256:e6ca4b116c8b979ed57f3045edcd3d49c25917d6dae99e90358f41322a19d375", size = 165744, upload-time = "2026-05-18T06:03:56.036Z" }, ] -[[package]] -name = "types-protobuf" -version = "7.34.1.20260518" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/29/59/e2b13b499d15e6720150c4b1a8d91e31fcacf716b432397475b3151ff7e4/types_protobuf-7.34.1.20260518.tar.gz", hash = "sha256:28cfaded25889cb83ebfb63cfb0a43628f0b6f3785767bec17287dc6468795f2", size = 68936, upload-time = "2026-05-18T06:01:47.332Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/2a/1f/ec5caf72c2e3b688ca3927e0979a04ddad19e1afc4bf1c199bd743e0f419/types_protobuf-7.34.1.20260518-py3-none-any.whl", hash = "sha256:a0a5337413347166439c0e07cbc26c6164d091401c6f01b1dfd8cdb966c4dd8f", size = 85992, upload-time = "2026-05-18T06:01:45.696Z" }, -] - [[package]] name = "types-pyyaml" version = "6.0.12.20260518"