Skip to content

API use plugin system#29

Closed
lchoquel wants to merge 12 commits into
devfrom
feature/Execute-per-request-mode
Closed

API use plugin system#29
lchoquel wants to merge 12 commits into
devfrom
feature/Execute-per-request-mode

Conversation

@lchoquel

@lchoquel lchoquel commented Jun 22, 2026

Copy link
Copy Markdown
Member

Summary by cubic

Make the API orchestrator‑agnostic and add per‑request backend selection. Also fixes startup teardown so a config warmup failure cleanly shuts down the Pipelex singleton.

  • New Features

    • New deployment config in api/api.toml: orchestration_mode (open token; default direct) and allow_request_orchestration_mode_override (default false).
    • Per-request orchestration_mode supported on /execute, /start, and /validate when allowed; unknown tokens are rejected.
    • /execute and /validate run with blocking delivery; /start is fire‑and‑forget and returns 400 when the resolved backend cannot run async (e.g. direct).
    • Dispatch goes through the OrchestratorRegistry and BundleValidatorRegistry; the base imports no orchestrator SDKs.
    • Error handling uses plugin‑provided HTTP error mappers; new error types for override policy and start‑mode capability.
    • OpenAPI/docs updated.
    • Dev: added .codex environment and .gitignore entries for local workflows.
  • Migration

    • The base image no longer includes Temporal; all runs are in‑process by default. To use distributed execution, add an orchestrator plugin (e.g. pipelex-temporal) and set orchestration_mode accordingly in ~/.pipelex/api_override.toml (or env‑specific files).
    • Remove the [temporal] block from .pipelex/pipelex.toml; orchestration is now controlled via api.toml.
    • If you need per-request backend selection, set allow_request_orchestration_mode_override = true.
    • On direct, use /execute until an async‑capable orchestrator is configured; /start will return 400.

Written for commit d4c3095. Summary will update on new commits.

Review in cubic

lchoquel and others added 9 commits June 21, 2026 17:00
…upling)

Make pipelex-api an orchestrator-agnostic runner: the published base names no
orchestrator and runs every pipeline in-process, while distributed execution
(Temporal, Mistral, …) becomes a deployment flavor = base + one orchestrator
plugin + that plugin's activation. `api/` imports no `pipelex.temporal` /
`temporalio`.

- F1 execution: `ApiRunner.start()` builds the run job locally (preserving
  request_id / output_multiplicity / dynamic_output_concept_ref / run
  registration / telemetry) and dispatches through the hub's OrchestratorRegistry
  under the resolved execution_mode — the identical final dispatch the runtime
  bridge performs, fed the rich PipeJob instead of the lossy PipelexPipeRunInput.
  Byte-equivalent to the old make_temporal_pipe_run().start() for the Temporal
  flavor; `direct` runs in-process (workflow_id null).
- F2 validate: always DIRECT in-process; the Temporal dispatch path is gone. The
  runner loads the method library API-side (documented; size a flavor's runner).
- F3 errors: exception handlers consume orchestrator-plugin HTTP-error mappers
  (discovered via build_registrar at app construction) instead of a hardcoded
  temporal handler; base imports no orchestrator SDK.
- Config: new pipelex-api-owned api.toml (execution_mode default `direct`,
  allow_request_execution_mode_override `false`), loaded via core's env-aware
  load_plugin_config. POST /v1/start honors a per-request execution_mode override
  only when the deployment opts in, else 403.
- Deps: dropped the `temporal` extra from pipelex; removed the [temporal] block
  from the base .pipelex/pipelex.toml. Editable `pipelex` pin (../_plugins) for
  local dev against the Phase A/B core — release-time flips to the published pin.

Tests rewritten to the agnostic shape (synthetic-plugin mapper proves the F3
seam without temporalio; conformance mocks the orchestrator registry; new
api_config override-policy tests). Gates green: ruff, pyright 0, mypy 0,
pylint 10, 308 tests, openapi-check.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_019aGUg464jUnCsALuf88r89
…e F2

/validate is now execution_mode-aware, mirroring /start: ApiRunner.validate_verdict
resolves the deployment's execution_mode and dispatches through the hub's
BundleValidatorRegistry, returning the verdict as a value — a 200 valid
PipelexValidationReport or a 200 invalid ErrorReport. The route discriminates on
isinstance(verdict, PipelexValidationReport) instead of catching ValidateBundleError.
`direct` validates in-process (F2's behavior, kept for the agnostic base); a `temporal_*`
mode dispatches the whole job to a worker (pipelex-temporal) and assembles the same
canonical report API-side. A per-request execution_mode override is gated by the same
policy as /start (403 when forbidden).

Drops the ApiRunner.validate @OverRide (its only caller was the route); adds
execution_mode to ValidateRequest. Reverses the F2 "always DIRECT in-process" prose
across the route/runner docstrings and docs/pipe-validate.md (the library load is now
flavor-conditional). CHANGELOG [Unreleased] updated in place (F2 never shipped).

Tests: new test_validate_dispatch.py (stub validator proves registry dispatch + verdict
mapping, missing-mode, forbidden-override 403; no temporalio import); the three tests
that mocked the removed validate are re-pointed at validate_verdict. All other /validate
tests pass unchanged (byte-identical verdict wire).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01HgkGZcadLLZfWMH8br4XAZ
…istry (extend F1)

`POST /v1/execute` now selects its backend from the resolved `execution_mode`
(deployment default + optional policy-gated per-request override), symmetric with
`/start` and `/validate`, instead of the boot-global pipe-run slot. `execution_mode`
becomes the single source of truth for top-level dispatch; `boot_orchestrator`
narrows to the execution stack. For a correctly-configured deployment the two
already agree, so the real-world delta is nil.

- `ApiRunner.execute` override: resolve `execution_mode` FIRST (403 on a forbidden
  override, before any library load), refuse a fire-and-forget resolution with a
  400 (`FireAndForgetNotSupported` — `/execute` is synchronous), look up the
  orchestrator (`MissingOrchestratorError` if absent), then inject it as the
  runner's `_pipe_run` and delegate to the base `execute`, which keeps the entire
  run lifecycle (library setup/teardown, tracer close, pipeline-manager cleanup,
  telemetry, error mapping).
- `_OrchestratorPipeRun` (a `PipeRunProtocol` adapter) + `_pipe_output_from_run_output`
  rehydrate the orchestrator's JSON-safe `PipelexPipeRunOutput` back into the rich
  `PipeOutput` the response wraps: `hydrate_working_memory` (run inside the still-open
  run library) + `PipeOutput.model_validate(..., strict=False)` — `strict=False` is
  required to reverse the orchestrator's `model_dump(mode="json")` of `graph_spec`
  (its `created_at` is a strict datetime). The full output is preserved.
- Route threads `requested_execution_mode=extras.execution_mode` (stops discarding
  the parsed extras). New `ErrorType.FIRE_AND_FORGET_NOT_SUPPORTED` (400).
- Tests: `tests/unit/test_execute_dispatch.py` (direct dispatch + full-output
  rehydration, graph_spec strict=False round-trip, per-request override honored,
  forbidden override 403, fire-and-forget 400, missing orchestrator).
- Docs: CHANGELOG, configuration.md (the `execution_mode` vs `boot_orchestrator`
  dual-knob model), pipe-run.md. Regenerated the OpenAPI artifact — which also
  absorbs pre-existing `/validate`-work drift (the `PipelexExecutionMode` component
  + validate `execution_mode` field were never regenerated at the base tip).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
…table path)

The `pipelex = { path = "../_plugins", editable = true }` source only resolves in
the local workspace; CI checks out pipelex-api alone, so `../_plugins` is absent and
every install-dependent job failed at `uv sync` ("Distribution not found"). Pin
`pipelex` to the unreleased core branch by commit SHA via git+https (the core repo is
public, so no CI credentials are needed) and relock. Flip back to the published
`==<ver>` PyPI pin once the core change ships.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
…hema

Review feedback (Greptile P1, Codex P2): `/execute` reads `execution_mode` from
the body and honors it, but its OpenAPI request body was generated from
`RunRequest.model_json_schema()`, which doesn't declare the field — so schema-driven
clients couldn't discover or set the per-request override.

Mirror the existing `PipelexApiStartRequest` pattern: add `PipelexApiExecuteRequest
(RunRequest)` declaring `execution_mode`, wire it into the `/execute` route's
`openapi_extra`, and regenerate the artifact. Factor the shared field description into
`_EXECUTION_MODE_DESCRIPTION` so the extras model and the doc model can't drift.

Scope: only `/execute` (the endpoint this PR changes). `/start`'s schema has the same
pre-existing gap (its `execution_mode` consumption predates this PR); left for its
owning work and recorded in wip.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
Fire-and-forget vs blocking is a property of the endpoint, not the deployment. /execute and /validate dispatch execution_mode as-is (synchronous); /start derives its fire-and-forget sibling (temporal_blocking -> temporal_fire_and_forget; direct/mistral have none). A deployment configures one synchronous execution_mode and every endpoint is coherent — no /execute 400, no forced-direct footgun.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
…tion

Review-driven follow-up to f5e3ef7.

Reject a fire-and-forget execution_mode as a CONFIGURED value (ApiConfig field
validator keyed on is_fire_and_forget). configuration.md declared
temporal_fire_and_forget unconfigurable but nothing enforced it — a baked value
would 400 every /execute and misroute /validate. Now fails fast at config load
(matching extra="forbid"). A per-request f&f override on /start is unaffected:
it stays gated by the override policy, not this field.

Fix the mistral_native workflow_id claim in the _async_start_mode docstring, the
call-site comment, and configuration.md: mistral_native runs per-call and answers
with a non-null workflow_id (the run id), not None — only direct returns None.

Add a pure-mapping test of _async_start_mode over every mode and a mistral_native
end-to-end /start case (both previously unverified despite the test docstring
claiming the coverage). Switch test_api_config's locked-Temporal helper to the
synchronous temporal_blocking it should always have used, and assert a configured
f&f mode raises.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
CI's `ruff format --check` gate (separate from `ruff check`) flagged the
multi-line signature in test_start_dispatch.py; the repo's line length keeps it
on one line.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_014Bktem3rLWRE2R7dmkEfbf
…art, /validate (#28)

* refactor(api): split execution_mode into orchestration_mode (open token) + endpoint-set delivery (Phase 3)

Replace the conflated `execution_mode` enum with the two-axis model from core
Phase 1: `orchestration_mode` (an open string token naming only the backend) and
delivery (BLOCKING vs FIRE_AND_FORGET) set by the endpoint, never by the caller.

- ApiConfig: `orchestration_mode: str` + `allow_request_orchestration_mode_override`;
  drop the fire-and-forget field-validator (no f&f token can be configured anymore);
  `resolve_execution_mode` -> `resolve_orchestration_mode` (same default/override/403).
- error_types: `EXECUTION_MODE_OVERRIDE_FORBIDDEN` -> `ORCHESTRATION_MODE_OVERRIDE_FORBIDDEN`;
  delete `FIRE_AND_FORGET_NOT_SUPPORTED`; add `START_REQUIRES_ASYNC_ORCHESTRATION` (400).
- /execute: drop the f&f-reject; dispatch BLOCKING via `_OrchestratorPipeRun(delivery=...)`.
- /start: HONEST capability check before any library load — a blocking-only orchestrator
  (the in-process `direct` base) is refused with a 400 instead of silently blocking and
  acking; dispatch FIRE_AND_FORGET. Delete `_async_start_mode`.
- /validate: thread `requested_orchestration_mode`; validation stays blocking.
- wire models + api.toml + open-token descriptions; regenerate the OpenAPI artifact.
- Pin core editable (../_plugins) so the Phase 1 split is live (flip to a pushed git SHA
  for CI/PR). Update docs + CHANGELOG.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_0154Kn5AhmT3LfrQfA1nWLMG

* build: pin pipelex to the pushed core split git rev so CI can resolve it

CI cannot resolve the editable ../_plugins path pin. Point pipelex at the pushed
core orchestration-mode/delivery split SHA (c48c5773) and relock. Flip back to
editable for local co-dev, or to the ==<ver> PyPI pin once core ships.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_0154Kn5AhmT3LfrQfA1nWLMG

* fix(api): advertise orchestration_mode in the /start OpenAPI request schema

/v1/start already honors a per-request orchestration_mode override at runtime
(parsed by PipelineApiExtras, threaded into runner.start), but its documented
body PipelexApiStartRequest omitted the field while /execute and /validate
expose it — so clients generated from the committed artifact could not drive
the override on /start. Add the field (reusing the shared description, symmetric
with PipelexApiExecuteRequest), regenerate the artifact, and add a regression
test asserting /start <-> /execute request-schema parity.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_012ZHTC4u7JcCHZSfKvZc6LN

---------

Co-authored-by: Louis Choquel <8851983+lchoquel@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@greptile-apps

greptile-apps Bot commented Jun 22, 2026

Copy link
Copy Markdown

Greptile Summary

This PR makes pipeline execution choose an orchestration backend per deployment or per allowed request. The main changes are:

  • Added api.toml and ApiConfig for orchestration_mode and override policy.
  • Routed /execute, /start, and /validate through orchestrator and validator registries.
  • Removed direct Temporal imports from the base API path.
  • Added plugin-provided HTTP error mapper registration.
  • Updated request schemas, OpenAPI docs, user docs, and unit tests for orchestration mode.

Confidence Score: 5/5

This looks safe to merge.

  • No blocking issues found in the changed code.

Important Files Changed

Filename Overview
api/api_config.py Adds deployment config loading and per-request orchestration mode override policy.
api/routes/pipelex/pipeline.py Moves execute, start, and validate dispatch through orchestration registries.
api/exception_handlers.py Replaces built-in Temporal exception handling with plugin-provided HTTP error mappers.
api/main.py Warms API config at startup and registers plugin HTTP error mappers.
api/schemas/models.py Adds request-level orchestration mode fields to API schemas.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[API request] --> B[Read request orchestration_mode]
    B --> C[Resolve against api.toml policy]
    C --> D{Endpoint}
    D -->|/execute| E[OrchestratorRegistry.run BLOCKING]
    D -->|/start| F[Check supports_fire_and_forget]
    F --> G[OrchestratorRegistry.run FIRE_AND_FORGET]
    D -->|/validate| H[BundleValidatorRegistry.validate]
    E --> I[Canonical execute response]
    G --> J[Start ack]
    H --> K[Validation verdict envelope]
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    A[API request] --> B[Read request orchestration_mode]
    B --> C[Resolve against api.toml policy]
    C --> D{Endpoint}
    D -->|/execute| E[OrchestratorRegistry.run BLOCKING]
    D -->|/start| F[Check supports_fire_and_forget]
    F --> G[OrchestratorRegistry.run FIRE_AND_FORGET]
    D -->|/validate| H[BundleValidatorRegistry.validate]
    E --> I[Canonical execute response]
    G --> J[Start ack]
    H --> K[Validation verdict envelope]
Loading

Reviews (1): Last reviewed commit: "refactor: orchestration_mode + endpoint-..." | Re-trigger Greptile

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3ddcb31b48

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread api/main.py Outdated
Comment thread pyproject.toml

dependencies = [
"pipelex[mistralai,anthropic,google,google-genai,bedrock,fal,temporal]==0.35.0",
"pipelex[mistralai,anthropic,google,google-genai,bedrock,fal]==0.35.0",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Publish the actual Pipelex source requirement

This change uses APIs from the unreleased Pipelex git revision, but the package metadata still tells non-uv installers to resolve PyPI pipelex==0.35.0; [tool.uv.sources] is uv-specific and is not part of the dependency requirement consumed by standard pip/build installs. Any environment installing this project without uv source handling will get 0.35.0 and then fail at import/startup when the new pipelex.hub/runtime_bridge symbols are missing, so the dependency should be a PEP 508 direct reference or a released version that contains these APIs.

Useful? React with 👍 / 👎.

Move get_api_config() inside the lifespan try block so a malformed
api.toml / baked override that raises during warmup still triggers
Pipelex.teardown_if_needed() via the finally. Previously the warmup
ran above the try, so a raise left Pipelex.make()'s live, ready
singleton registered and unguarded — a startup retry or second app
in the same process would then fail as "already initialized".

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_012ZHTC4u7JcCHZSfKvZc6LN
@lchoquel lchoquel changed the title Feature/execute per request mode API use plugin system Jun 22, 2026
@lchoquel lchoquel closed this Jun 24, 2026
@lchoquel lchoquel deleted the feature/Execute-per-request-mode branch June 24, 2026 14:53
@github-actions github-actions Bot locked and limited conversation to collaborators Jun 24, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant