Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .codex/environments/environment.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# THIS IS AUTOGENERATED. DO NOT EDIT MANUALLY
version = 1
name = "pipelex-api"

[setup]
script = "make install"
8 changes: 6 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ temp/

gcp_credentials.json

results/*
.pipelex/storage

# personnal pipelex config files that overrides the default one
pipelex_super.toml
pipelex_override.toml
telemetry_override.toml
.pipelex/storage

.worktreeinclude
57 changes: 0 additions & 57 deletions .pipelex/pipelex.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,60 +203,3 @@ traces_dir = ".pipelex/traces"
# Configure your own table_name + region in your override file when backend = "dynamodb".
table_name = ""
region = ""

####################################################################################################
# Temporal Config
####################################################################################################

[temporal]
# Temporal is opt-in. With Temporal disabled the API runs pipelines in-process
# (POST /v1/execute) and POST /v1/start is unavailable.
# To enable: add a [temporal] section in your pipelex_override.toml with
# is_enabled = true and define a server under [temporal.temporal_config.temporal_server_configs.<name>].
is_enabled = false

[temporal.worker_config]
workflow_execution_timeout = "1:00:00"
run_timeout = "1:00:00"
task_timeout = "0:00:10"
start_delay = "0:00:00"
rpc_timeout = "1:00:00"

[temporal.worker_config.retry_policy_config]
initial_interval = "0:00:03"
backoff_coefficient = 2.0
maximum_interval = "unlimited"
maximum_attempts = 3
non_retryable_error_types = [
"ExtractHandleNotFoundError",
"FileNotFoundError",
"ImgGenHandleNotFoundError",
"LLMHandleNotFoundError",
"ModelNotFoundError",
"ValidationError",
]

[temporal.temporal_config]
selected_server = "local"

[temporal.temporal_config.temporal_server_configs.local]
description = "default localhost"
target_host = "localhost:7233"
namespace = "default"
api_key_method = "none"
api_key_id = ""

[temporal.temporal_config.temporal_log_config]
is_workflow_info_on_message = false
is_workflow_info_on_extra = true
is_full_workflow_info_on_extra = false
is_activity_info_on_message = false
is_activity_info_on_extra = true
is_full_activity_info_on_extra = false
is_formatter_enabled = true
is_prefix_enabled = true
managed_loggers = [
"temporalio.activity",
"temporalio.workflow",
"temporalio.worker._activity",
]
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## [Unreleased]

Orchestrator-agnostic base. `pipelex-api` no longer hard-wires Temporal: the published base names **no** orchestrator and runs every pipeline in-process, while distributed execution (Temporal, Mistral Workflows, …) becomes a deployment *flavor* = base image + exactly one orchestrator plugin + that plugin's activation. `api/` imports no `pipelex.temporal` / `temporalio`.

### Added
- **`orchestration_mode` deployment config:** a top-level run's backend is read from a new packaged `api.toml` (`orchestration_mode`, default `direct`; `allow_request_orchestration_mode_override`, default `false`), env-layered like the Pipelex config (`api_{env}.toml` / `api_override.toml`). `orchestration_mode` is an **open string token** (core owns `direct`; each orchestrator plugin owns its own, e.g. `temporal`); an unregistered token fails loud at dispatch. `POST /v1/execute`, `POST /v1/start`, and `POST /v1/validate` may each carry a per-request `orchestration_mode` override, honored only when the deployment opts in — otherwise refused with a `403` (`OrchestrationModeOverrideForbidden`). The **delivery** axis (blocking vs fire-and-forget) is endpoint-intrinsic, never configured or requestable. See [Configuration → Orchestration mode](docs/configuration.md).
- **Orchestrator HTTP-error mappers:** the app discovers each installed orchestrator plugin's transport-fault mapper (via the plugin SPI `add_http_error_mapper`) at construction and registers one RFC 7807 handler per mapped exception type — so a plugin's transport fault renders correctly while the base imports no orchestrator SDK.

### Changed
- **Two orthogonal axes — `orchestration_mode` (open token) + delivery (endpoint-set):** the backend (*which orchestrator*) and the wait-semantics (*whether the caller waits*) are now distinct. `orchestration_mode` names only the backend and is an open string token. Delivery is `DeliveryMode {BLOCKING, FIRE_AND_FORGET}`, threaded into the orchestrator by the endpoint and never received from a caller: `/execute` and `/validate` dispatch `BLOCKING`, `/start` dispatches `FIRE_AND_FORGET`. The conflated `execution_mode` enum (which carried `temporal_blocking` / `temporal_fire_and_forget` / `mistral_native`) is gone.
- **`/execute` dispatches by `orchestration_mode`:** `POST /v1/execute` selects its backend from the resolved `orchestration_mode` — dispatching the locally-built run job through the hub's `OrchestratorRegistry` with `BLOCKING` delivery, symmetric with `/start` — instead of the boot-global pipe-run slot. `orchestration_mode` is the single source of truth for top-level dispatch (`boot_orchestrator` narrows to the execution stack). A per-request `orchestration_mode` override is gated by the same `403` policy as `/start`. The full output is preserved across the dispatch: the orchestrator's JSON-safe result is rehydrated back into the canonical `PipeOutput` the response wraps.
- **`/start` is HONEST about fire-and-forget:** `POST /v1/start` builds the run job locally (preserving `request_id`, `output_multiplicity`, `dynamic_output_concept_ref`, run registration, and telemetry) and dispatches it through the hub's `OrchestratorRegistry` with `FIRE_AND_FORGET` delivery. It checks the resolved orchestrator's `supports_fire_and_forget` capability **before** loading a library: an async-capable backend (`temporal`, contributed by the `pipelex-temporal` plugin) acks `202` with its `workflow_id`, while the in-process `direct` base is blocking-only and is refused with a `400` (`StartRequiresAsyncOrchestration`) — use `/execute` — instead of silently running blocking and acking.
- **`/validate` dispatches by `orchestration_mode` (verdict-as-value):** like `/start`, `/validate` resolves the deployment's `orchestration_mode` and dispatches through the hub's `BundleValidatorRegistry`, returning the verdict as a value — a 200 valid `PipelexValidationReport` or a 200 invalid `ErrorReport` (no exception-as-control-flow at the route). Validation is inherently blocking, so there is no delivery axis here. `direct` validates **in-process** on the API side; a `temporal` mode dispatches the whole job to a **worker** (contributed by the `pipelex-temporal` plugin) and assembles the same canonical report API-side from the worker's result. A per-request `orchestration_mode` override is gated by the same policy as `/start` (`403` when forbidden). The verdict wire is byte-identical across backends. Size in-process `/validate` traffic for the library load; on a worker-dispatched flavor the library work happens worker-side.
- **Dependencies:** dropped the `temporal` extra from `pipelex` (`pipelex[mistralai,anthropic,google,google-genai,bedrock,fal]`). The base depends on no orchestrator plugin.

## [v0.5.0] - 2026-06-18

### Added
Expand Down
21 changes: 21 additions & 0 deletions api/api.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Pipelex-API deployment config — the packaged default, loaded by `load_api_config()`
# (api/api_config.py) via core's env-aware `load_plugin_config` and validated into `ApiConfig`.
# Keys live at the file root (no `[api]` wrapper) — `load_plugin_config` validates the whole merged
# document against the schema, exactly like `pipelex-temporal`'s `temporal.toml`. The packaged
# default here is deep-merged with an optional `api_{env}.toml` / `api_override.toml` at `~/.pipelex`
# then the project `.pipelex` (env selected by `PIPELEX_ENV`). This open-source base names NO
# orchestrator: it ships the in-process `direct` mode and refuses per-request override. A deployment
# flavor bakes its own `.pipelex/api_{env}.toml` to flip this (e.g. `pipelex-api-hosted` sets
# `orchestration_mode = "temporal"`).

# Which orchestrator a top-level run dispatches to, through the orchestrator registry.
# `orchestration_mode` is an OPEN string token: core owns "direct" (in-process); each orchestrator
# plugin owns its own ("temporal" from pipelex-temporal, "mistralai-workflows" from
# pipelex-mistralai-workflows). A token whose plugin is not installed fails loud at dispatch with the
# plugin's install hint. The delivery axis (blocking vs fire-and-forget) is NOT configured here — it
# is set by the endpoint (`/execute` and `/validate` block; `/start` is fire-and-forget).
orchestration_mode = "direct"

# Whether a caller may override `orchestration_mode` per request. Off on the base (and recommended off
# on hosted flavors): a locked-down distributed runner must not be coercible into `direct`.
allow_request_orchestration_mode_override = false
109 changes: 109 additions & 0 deletions api/api_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Pipelex-API deployment config: the top-level orchestration mode + override policy.

The runner is orchestrator-agnostic. WHICH orchestrator a top-level run dispatches
to — ``direct`` in-process (the base default), ``temporal``, ``mistralai-workflows``,
… — is a *deployment* choice, never a property of this open-source base.
``orchestration_mode`` is an open string token (core owns ``"direct"``; each plugin
owns its own); the *delivery* axis (blocking vs fire-and-forget) is endpoint-set, not
configured here. It is read from a packaged ``api.toml`` (keys at the file root — no
``[api]`` wrapper, since :meth:`load_plugin_config` validates the whole document
against the schema, exactly like ``temporal.toml``), env-layered like the main pipelex
config and every plugin config (D2): the packaged default ``api.toml`` (shipped in this
wheel) is deep-merged with the env-selected ``api_{environment}.toml`` and
``api_override.toml`` from ``~/.pipelex`` then the project ``.pipelex``, with
``PIPELEX_ENV`` (``runtime_manager.environment``) choosing the env file. One image
bakes every env file; a deployment flavor (e.g. ``pipelex-api-hosted``) bakes
``.pipelex/api_{env}.toml`` to flip the default. The base names no orchestrator and
ships ``orchestration_mode = "direct"``.

Why a separate ``api.toml`` and not the core ``pipelex_{env}.toml``: core's
config is ``extra="forbid"``, so an ``[api]`` section there is rejected at load.
Loading it via core's reusable :meth:`load_plugin_config` keeps this a pure
pipelex-api concern while reusing the identical env-layering machinery — and
keeps it symmetric with how ``pipelex-temporal`` self-loads ``temporal.toml``.
"""

from functools import cache
from pathlib import Path

from pipelex.system.configuration.config_loader import config_manager
from pydantic import BaseModel, ConfigDict

from api.error_types import ErrorType
from api.errors import raise_forbidden

API_CONFIG_NAME = "api"

# The packaged default ``api.toml`` ships in the wheel alongside this module.
_PACKAGE_DIR = Path(__file__).resolve().parent


class ApiConfig(BaseModel):
"""The ``[api]`` deployment config: default orchestration mode + override policy.

No field defaults — the packaged ``api.toml`` is the single source of the
base defaults (mirroring core's "defaults live in the TOML, never in the
model" discipline). ``extra="forbid"`` so a typo'd key in a baked override
fails loud at load instead of being silently ignored.

``orchestration_mode`` is an open string token (core owns ``"direct"``; each
plugin owns its own). It is NOT validated against a closed enum here — an
unregistered token is refused at dispatch by ``MissingOrchestratorError``,
the single validation point. The delivery axis (blocking vs fire-and-forget)
is endpoint-set, never configured, so nothing about wait-semantics lives here.
"""

model_config = ConfigDict(extra="forbid")

orchestration_mode: str
allow_request_orchestration_mode_override: bool


def load_api_config() -> ApiConfig:
"""Load the ``[api]`` config from ``api.toml`` with env-aware layering (D2).

Delegates to core's reusable plugin-config loader: the packaged ``api.toml``
is deep-merged with the env-selected overrides. The packaged default alone is
a valid, fully-resolved config — every override tier is optional. Requires
Pipelex to be booted (``runtime_manager.environment`` must be resolved), so
it is called only after ``Pipelex.make`` — never at import.
"""
return config_manager.load_plugin_config(name=API_CONFIG_NAME, package_dir=_PACKAGE_DIR, schema=ApiConfig)


@cache
def get_api_config() -> ApiConfig:
"""Process-cached :class:`ApiConfig`.

The config is immutable for the life of the process (``PIPELEX_ENV`` is fixed
at boot), so it is loaded once and cached. ``api.main`` warms this at startup
so a malformed ``api.toml`` / baked override fails the app fast — the same
fail-fast posture as ``ERROR_DISCLOSURE``. Tests that need a different mode
patch this getter (or call :func:`resolve_orchestration_mode` with a hand-built
config) rather than mutating the cache.
"""
return load_api_config()


def resolve_orchestration_mode(requested: str | None, *, config: ApiConfig) -> str:
"""Resolve the effective orchestration mode for a top-level run, applying policy.

The deployment default (``config.orchestration_mode``) wins unless the caller
supplied a *different* token AND the deployment opted into per-request
override (``allow_request_orchestration_mode_override``). A caller-supplied
token equal to the default is always honored (it changes nothing). A caller
trying to FORCE a different mode on a runner whose policy forbids it is refused
with a 403 — so a locked-down Temporal runner can never be coerced into
``direct`` (whose whole point would be to bypass distributed execution), and
vice versa. The token is a plain string compare; an *unregistered* token is
not rejected here — that surfaces at dispatch as ``MissingOrchestratorError``.
"""
if requested is None or requested == config.orchestration_mode:
return config.orchestration_mode
if config.allow_request_orchestration_mode_override:
return requested
msg = (
f"This deployment does not allow overriding orchestration_mode per request "
f"(configured mode '{config.orchestration_mode}', requested '{requested}')."
)
raise_forbidden(msg, error_type=ErrorType.ORCHESTRATION_MODE_OVERRIDE_FORBIDDEN)
16 changes: 14 additions & 2 deletions api/error_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,21 @@ class ErrorType(StrEnum):
# Authentication / authorization
UNAUTHENTICATED = "Unauthenticated"
FORBIDDEN = "Forbidden"
# A caller asked to run in an orchestration_mode this deployment forbids overriding
# (per-request override is off — see `allow_request_orchestration_mode_override` in api.toml).
# A 403: the deployment policy refuses to honor the requested mode.
ORCHESTRATION_MODE_OVERRIDE_FORBIDDEN = "OrchestrationModeOverrideForbidden"
INVALID_TOKEN = "InvalidToken"
TOKEN_EXPIRED = "TokenExpired"
SERVER_MISCONFIGURED = "ServerMisconfigured"

# A caller hit `/start` on a deployment whose resolved orchestration mode cannot do genuine
# async (its orchestrator's `supports_fire_and_forget` is False — e.g. the in-process `direct`
# base). `/start` is fire-and-forget by nature, so rather than silently running blocking and
# acking, it refuses HONESTLY with a 400: use `/execute` (synchronous) instead. Checked AFTER
# the override policy, so a forbidden per-request override still 403s first.
START_REQUIRES_ASYNC_ORCHESTRATION = "StartRequiresAsyncOrchestration"

# Request validation
BAD_REQUEST = "BadRequest"
VALIDATION_ERROR = "ValidationError"
Expand All @@ -38,7 +49,8 @@ class ErrorType(StrEnum):
# Misc
PACKAGE_NOT_FOUND = "PackageNotFound"
# The `error_type` for the catch-all 500 emitted by `handle_unexpected_error`
# (any failure that is neither a `PipelexError`, a `TemporalError`, nor an
# `ApiError`). Stays in this enum so the same `build_problem_document_from_api_error`
# (any failure matched by no more-specific handler — not an `ApiError`, a
# `RequestValidationError`, a `PipelexError`, or an orchestrator plugin's mapped
# transport exception). Stays in this enum so the same `build_problem_document_from_api_error`
# builder renders it — same shape as every other API-authored 500.
INTERNAL_SERVER_ERROR = "InternalServerError"
Loading
Loading