Skip to content

feat(wirings): stateless tier DataStream wirings for the generated MEOS facades (stacks on #5)#6

Closed
estebanzimanyi wants to merge 1 commit into
MobilityDB:codegen/flink-meos-opsfrom
estebanzimanyi:feat/flink-stateless-tier-wirings
Closed

feat(wirings): stateless tier DataStream wirings for the generated MEOS facades (stacks on #5)#6
estebanzimanyi wants to merge 1 commit into
MobilityDB:codegen/flink-meos-opsfrom
estebanzimanyi:feat/flink-stateless-tier-wirings

Conversation

@estebanzimanyi

Copy link
Copy Markdown
Member

First of four planned tier-specific wiring follow-ups on PR #5's generated MEOS facade. Adds the org.mobilitydb.flink.meos.wirings package with thin, generic Flink-DataStream wrappers organized per streaming tier.

What's in this PR

File Purpose
MeosStatelessMap<IN, OUT> Generic MapFunction wrapping any stateless MeosOps method via a serializable lambda — no per-method boilerplate
MeosStatelessFilter<IN> Generic FilterFunction wrapping any stateless boolean-returning MeosOps method + .fromIntPredicate(...) adapter for JMEOS' int-coded predicates
demo/MeosWiringsDemoJob Runnable end-to-end DataStream pipeline: parse TBox WKT → filter by overlap with a query box → serialize survivors to hex-WKB, all through the generated facades wired via this package
README.md Tier vocabulary, the wrap-once-use-everywhere pattern, DataStream-API-only design rationale, coexistence with berlinmod.MEOSBridge

Tier coverage of this PR

Tier Method count in PR #5 codegen Wrapped here
stateless 804 (92 OO + 712 free) ✅ via MeosStatelessMap / MeosStatelessFilter
bounded-state 797 next follow-up (MeosBoundedStateMap with ValueState<Pointer> per key)
windowed 161 next follow-up (MeosWindowedAggregate over ProcessWindowFunction)
cross-stream 140 next follow-up (MeosCrossStreamJoin with KeyedCoProcessFunction or interval-join)
io-meta 195 covered transitively by MeosStatelessMap (no state, no window)
sequence-only 14 n/a — inherently non-streamable

So this PR alone makes the 804-method stateless slice + the 195-method io-meta slice Flink-DataStream-wirable through one generic helper per pattern. Adopters wire any of the ~1,000 methods through these two classes without per-method registration.

Design choice — DataStream API, not Table API

The repo's existing pipeline (berlinmod/, aisdata/) is DataStream-API only. Sticking to DataStream avoids adding the ~50 MB flink-table-planner runtime dep to the build matrix. A Table-API-shaped sibling (MeosScalarUDF + MeosCatalogRegistrar) is a clean follow-up if/when the repo adopts Table API for other reasons.

How a generated MEOS call becomes a Flink operator

// 1. Pick the generated MeosOps method (Javadoc tier marker tells you which wiring to use)
boolean overlap = MeosOpsFreeCore.overlaps_tbox_tbox(boxA, boxB);  // tier = stateless

// 2. Wrap with the matching wiring
MeosStatelessFilter<TboxPair> filter = MeosStatelessFilter.fromIntPredicate(
    pair -> MeosOpsFreeCore.overlaps_tbox_tbox(pair.a, pair.b));

// 3. Apply to the DataStream
DataStream<TboxPair> overlapping = stream.filter(filter);

MEOS_AVAILABLE is probed once per JVM by MeosOpsRuntime's static initializer (shared across all generated facades). When unavailable, every generated method throws UnsupportedOperationException with a clear message — the wiring layer doesn't have to handle that itself.

Coexistence with berlinmod.MEOSBridge

MEOSBridge.java (BerlinMOD-specific, hand-written) keeps the per-BerlinMOD-query intent — high-level and query-shaped. The wirings here are low-level and catalog-shaped — applicable to any of the ~1,000 stateless or io-meta generated methods, not just the BerlinMOD-9 subset. Both share the same MEOS_AVAILABLE discipline.

Stacking

This PR stacks on codegen/flink-meos-ops (PR #5). Additive-only: 4 new files under flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/. No existing file is touched.

Compile verification

Locally green: 129 .class files total (123 from PR #5's base + 6 new — 3 wirings classes + their nested lambda interfaces + the demo class). Demo MeosWiringsDemoJob compiles clean and is runnable via mvn -q exec:java (per the README).

@estebanzimanyi estebanzimanyi force-pushed the feat/flink-stateless-tier-wirings branch from ef44ffc to 457987c Compare May 29, 2026 12:42
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
Adds MeosBoundedStateMap<K, IN, OUT> — the second tier-wiring class
in the org.mobilitydb.flink.meos.wirings package, stacked on PR MobilityDB#6
(stateless wirings).

Bounded-state is the second-largest streaming tier in the v4 baseline
(797 of 2,097 emitted methods — 513 OO-classified + 284 free-fn). The
canonical pattern is per-key MEOS-handle accumulation: a running tbox
union, a running temporal value, a per-vehicle accumulator that keeps
the MEOS value alive across events.

## Design — state lives as bytes, not as Pointer

A jnr.ffi.Pointer is a raw native-memory address. It is not portable
across JVM restarts; Flink could not checkpoint, savepoint, or replay
state if the wiring stored raw pointers. MeosBoundedStateMap stores
state as byte[] (MEOS-WKB or MEOS-WKT, adopter's choice) with three
adopter-supplied lambdas mediating the round-trip:

- PointerSerialize:   Pointer → byte[]  (called after each step)
- PointerDeserialize: byte[] → Pointer  (called before each step)
- MeosStepFn:         (prior Pointer, event) → (new Pointer, output)

The first event for a key sees prior == null; the wiring handles that
case by skipping deserialize and seeding state with the first event's
result. Subsequent events re-hydrate, mutate, re-serialize.

Net effect: state crossing the operator boundary is always byte[];
checkpoints, savepoints, and rescaling all work correctly. This is
the same serde discipline MobilityDuck's persistent state machines
use.

## Files

- MeosBoundedStateMap.java — the generic wiring class
- demo/MeosBoundedStateDemoJob.java — runnable per-vehicle running
  tbox union pipeline (6 events × 2 vehicles; demonstrates per-key
  isolation, first-event-null correctness, and checkpoint-safe state)
- README — bounded-state row marked ✅ shipped

## Stacks on PR MobilityDB#6

Additive-only; touches no existing file. Locally compile-verified:
135 .class files total (129 from PR MobilityDB#6 base + 6 new — 1 wiring class
+ 3 nested lambda interfaces + MeosStep tuple + 1 demo class).
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).
…OS facades

Adds the org.mobilitydb.flink.meos.wirings package — thin, generic
Flink-DataStream wrappers around the generated MeosOps* facades from
PR MobilityDB#5, organized per streaming tier.

This PR ships the stateless tier:
- MeosStatelessMap<IN, OUT>: generic MapFunction wrapping any stateless
  MeosOps* method (804 of the 2,097 generated methods qualify per the
  v4 baseline — 92 OO-classified + 712 free-fn)
- MeosStatelessFilter<IN>: generic FilterFunction wrapping any stateless
  boolean-returning MeosOps* method, plus a .fromIntPredicate(...)
  adapter for JMEOS' int-coded predicates
- demo/MeosWiringsDemoJob: runnable end-to-end DataStream pipeline
  parsing TBox WKT → filtering by overlap with a query box →
  serializing surviving boxes to hex-WKB, all through the generated
  facades wired via this package
- README documenting tier vocabulary, the wrap-once-use-everywhere
  pattern, the DataStream-API-only design choice (Table API as future
  follow-up), and coexistence with berlinmod.MEOSBridge

Future follow-ups (one PR per tier, mirroring this one's shape):
- MeosBoundedStateMap (generic KeyedProcessFunction with
  ValueState<Pointer> for MEOS handle per key — covers 797 of the
  generated methods)
- MeosWindowedAggregate (generic ProcessWindowFunction — 161 methods)
- MeosCrossStreamJoin (generic KeyedCoProcessFunction or interval-join
  — 140 methods)
- Optional: Table API sibling (MeosScalarUDF + MeosCatalogRegistrar)
  if the repo adopts Table API for other reasons

Stacks on codegen/flink-meos-ops (PR MobilityDB#5). Additive-only; touches no
existing file. Locally compile-verified: 129 .class files total (123
from the parent PR + 6 new from this package's classes + demo + their
nested lambdas).

(cherry picked from commit 457987c)
@estebanzimanyi estebanzimanyi force-pushed the feat/flink-stateless-tier-wirings branch from 457987c to 1b18a8e Compare May 31, 2026 07:49
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
Adds MeosBoundedStateMap<K, IN, OUT> — the second tier-wiring class
in the org.mobilitydb.flink.meos.wirings package, stacked on PR MobilityDB#6
(stateless wirings).

Bounded-state is the second-largest streaming tier in the v4 baseline
(797 of 2,097 emitted methods — 513 OO-classified + 284 free-fn). The
canonical pattern is per-key MEOS-handle accumulation: a running tbox
union, a running temporal value, a per-vehicle accumulator that keeps
the MEOS value alive across events.

## Design — state lives as bytes, not as Pointer

A jnr.ffi.Pointer is a raw native-memory address. It is not portable
across JVM restarts; Flink could not checkpoint, savepoint, or replay
state if the wiring stored raw pointers. MeosBoundedStateMap stores
state as byte[] (MEOS-WKB or MEOS-WKT, adopter's choice) with three
adopter-supplied lambdas mediating the round-trip:

- PointerSerialize:   Pointer → byte[]  (called after each step)
- PointerDeserialize: byte[] → Pointer  (called before each step)
- MeosStepFn:         (prior Pointer, event) → (new Pointer, output)

The first event for a key sees prior == null; the wiring handles that
case by skipping deserialize and seeding state with the first event's
result. Subsequent events re-hydrate, mutate, re-serialize.

Net effect: state crossing the operator boundary is always byte[];
checkpoints, savepoints, and rescaling all work correctly. This is
the same serde discipline MobilityDuck's persistent state machines
use.

## Files

- MeosBoundedStateMap.java — the generic wiring class
- demo/MeosBoundedStateDemoJob.java — runnable per-vehicle running
  tbox union pipeline (6 events × 2 vehicles; demonstrates per-key
  isolation, first-event-null correctness, and checkpoint-safe state)
- README — bounded-state row marked ✅ shipped

## Stacks on PR MobilityDB#6

Additive-only; touches no existing file. Locally compile-verified:
135 .class files total (129 from PR MobilityDB#6 base + 6 new — 1 wiring class
+ 3 nested lambda interfaces + MeosStep tuple + 1 demo class).

(cherry picked from commit a1f36cd)
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).

(cherry picked from commit 10a03b6)
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).

(cherry picked from commit d322b8c)
@estebanzimanyi estebanzimanyi changed the base branch from main to codegen/flink-meos-ops June 11, 2026 16:46
@estebanzimanyi

Copy link
Copy Markdown
Member Author

Superseded by the Path-B consolidation: the former 18-deep stack is collapsed into two reviewable topical PRs on top of the merged scaffold — MEOS integration #30 → benchmark #31 — each one clean squashed commit with the generated-facade bulk, dead family-flag profiles, committed target/ artifacts, and invented synthetic corpus removed. Closing as folded into #30/#31.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant