Skip to content

feat(wirings): bounded-state tier DataStream wiring + demo (stacks on #6)#7

Closed
estebanzimanyi wants to merge 1 commit into
MobilityDB:feat/flink-stateless-tier-wiringsfrom
estebanzimanyi:feat/flink-bounded-state-tier-wirings
Closed

feat(wirings): bounded-state tier DataStream wiring + demo (stacks on #6)#7
estebanzimanyi wants to merge 1 commit into
MobilityDB:feat/flink-stateless-tier-wiringsfrom
estebanzimanyi:feat/flink-bounded-state-tier-wirings

Conversation

@estebanzimanyi

Copy link
Copy Markdown
Member

Second of four planned tier-specific wiring follow-ups on PR #5's generated MEOS facade. Stacks on PR #6 (stateless wirings). Adds the bounded-state tier.

What's in this PR

File Purpose
MeosBoundedStateMap<K, IN, OUT> Generic KeyedProcessFunction with ValueState<byte[]> per key — wraps any bounded-state MeosOps call as a per-key MEOS-handle accumulator
demo/MeosBoundedStateDemoJob Runnable per-vehicle running-tbox-union pipeline (6 events × 2 vehicles); demonstrates per-key isolation, first-event-null correctness, and checkpoint-safe state
README.md bounded-state row marked ✅ shipped

Design — state crosses the boundary as bytes, not Pointer

A jnr.ffi.Pointer is a raw native-memory address. Storing one in Flink's ValueState would be a correctness time-bomb — pointers don't survive checkpoints, savepoints, or operator-instance rescaling. So MeosBoundedStateMap keeps state as byte[] (MEOS-WKB or MEOS-WKT, adopter's choice) with three adopter-supplied lambdas mediating the round-trip through MEOS:

byte[] state                  -- the per-key serialized MEOS value (in checkpoint)
    ↓ deserialize (bytes → Pointer)
Pointer prev                  -- the in-flight MEOS handle
    ↓ step(prev, event) → (newPointer, output)
Pointer next, OUT out         -- the new in-flight handle + per-event output
    ↓ serialize (Pointer → bytes)
byte[] newState               -- the per-key serialized new MEOS value (back to checkpoint)

The first event for a key sees prior == null (no state yet); the wiring handles that case by skipping deserialize and letting the step seed state with the first event's result. Same serde discipline MobilityDuck's persistent state machines use.

Tier coverage now (PR #6 + this PR)

Tier Method count (v4 baseline) Wirable through
stateless 804 MeosStatelessMap / MeosStatelessFilter (PR #6)
bounded-state 797 MeosBoundedStateMap (this PR)
windowed 161 next follow-up (MeosWindowedAggregate)
cross-stream 140 next follow-up (MeosCrossStreamJoin)
io-meta 195 covered transitively by MeosStatelessMap (no state, no window)
sequence-only 14 n/a — inherently non-streamable

Cumulative: ~1,800 of PR #5's 2,097 generated methods (86%) are now wirable through 3 generic classes (Map + Filter + BoundedStateMap), without per-method registration.

Demo highlights

MeosBoundedStateDemoJob is a 6-event × 2-vehicle pipeline that computes a per-vehicle running tbox union via MeosOpsFreeCore.union_tbox_tbox. State holds the MEOS-WKT text of the current union; each event:

  1. Deserializes the prior state via MeosOpsTBox.tbox_in (or null on first event).
  2. Calls union_tbox_tbox(prior, eventTbox, 0) (skipping the call entirely when prior is null).
  3. Serializes the new state via MeosOpsTBox.tbox_out(newUnion, 6).
  4. Emits (vehicleId, runningUnionWKT).

Output (when run with libmeos available): 6 lines, monotonically growing per-vehicle union tboxes, vehicle 1's state independent of vehicle 2's.

Stacking

This PR stacks on feat/flink-stateless-tier-wirings (PR #6). Additive-only: 3 file changes (1 new wiring class + 1 new demo + 1 README row marked ✅). No existing file is touched beyond the README row.

Compile verification

Locally green: 135 .class files total (129 from PR #6 base + 6 new — 1 wiring class + 3 nested lambda interfaces + MeosStep tuple + 1 demo class).

@estebanzimanyi estebanzimanyi force-pushed the feat/flink-bounded-state-tier-wirings branch from 539eb14 to a1f36cd Compare May 29, 2026 12:42
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
Adds MeosWindowedAggregate<K, IN, OUT, W> — the third tier-wiring
class in the org.mobilitydb.flink.meos.wirings package, stacked on
PR MobilityDB#7 (bounded-state wirings).

Windowed is the third streaming tier (161 of 2,097 emitted methods,
~8%) — output cardinality changes; one MEOS aggregate per window.
Canonical examples: temporal_length(tgeo) for per-window trajectory
length, temporal_twavg(tnumber) for time-weighted average per
window, per-class _trajectory / _time / _timespan accessors.

## Design

Wraps any windowed MeosOps call as a ProcessWindowFunction with a
slim adopter-facing signature: the lambda receives the window
metadata, the iterable of in-window events, and a slim Context
exposing key + processing-time + watermark (free of Flink internals).

Unlike bounded-state, NO MEOS handle persists across window
boundaries — each window's MEOS value is built fresh from the
iterable on window close, used to compute the output, discarded.
The iterable's events are Flink-side data; MEOS handles are
short-lived per-window.

## Files

- MeosWindowedAggregate.java — the generic wiring class
- demo/MeosWindowedDemoJob.java — runnable 30s-tumbling-window
  per-vehicle aggregate-tbox demo (8 events × 2 vehicles × 2 windows;
  demonstrates window-close timing, per-key isolation, fresh-per-
  window aggregation)
- README — windowed row marked ✅ shipped

## Stacks on PR MobilityDB#7

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 140 .class files total (135 from PR MobilityDB#7
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).
Adds MeosBoundedStateMap<K, IN, OUT> — the second tier-wiring class
in the org.mobilitydb.flink.meos.wirings package, stacked on PR MobilityDB#6
(stateless wirings).

Bounded-state is the second-largest streaming tier in the v4 baseline
(797 of 2,097 emitted methods — 513 OO-classified + 284 free-fn). The
canonical pattern is per-key MEOS-handle accumulation: a running tbox
union, a running temporal value, a per-vehicle accumulator that keeps
the MEOS value alive across events.

## Design — state lives as bytes, not as Pointer

A jnr.ffi.Pointer is a raw native-memory address. It is not portable
across JVM restarts; Flink could not checkpoint, savepoint, or replay
state if the wiring stored raw pointers. MeosBoundedStateMap stores
state as byte[] (MEOS-WKB or MEOS-WKT, adopter's choice) with three
adopter-supplied lambdas mediating the round-trip:

- PointerSerialize:   Pointer → byte[]  (called after each step)
- PointerDeserialize: byte[] → Pointer  (called before each step)
- MeosStepFn:         (prior Pointer, event) → (new Pointer, output)

The first event for a key sees prior == null; the wiring handles that
case by skipping deserialize and seeding state with the first event's
result. Subsequent events re-hydrate, mutate, re-serialize.

Net effect: state crossing the operator boundary is always byte[];
checkpoints, savepoints, and rescaling all work correctly. This is
the same serde discipline MobilityDuck's persistent state machines
use.

## Files

- MeosBoundedStateMap.java — the generic wiring class
- demo/MeosBoundedStateDemoJob.java — runnable per-vehicle running
  tbox union pipeline (6 events × 2 vehicles; demonstrates per-key
  isolation, first-event-null correctness, and checkpoint-safe state)
- README — bounded-state row marked ✅ shipped

## Stacks on PR MobilityDB#6

Additive-only; touches no existing file. Locally compile-verified:
135 .class files total (129 from PR MobilityDB#6 base + 6 new — 1 wiring class
+ 3 nested lambda interfaces + MeosStep tuple + 1 demo class).

(cherry picked from commit a1f36cd)
@estebanzimanyi estebanzimanyi force-pushed the feat/flink-bounded-state-tier-wirings branch from a1f36cd to 49e7824 Compare May 31, 2026 07:49
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
Adds MeosWindowedAggregate<K, IN, OUT, W> — the third tier-wiring
class in the org.mobilitydb.flink.meos.wirings package, stacked on
PR MobilityDB#7 (bounded-state wirings).

Windowed is the third streaming tier (161 of 2,097 emitted methods,
~8%) — output cardinality changes; one MEOS aggregate per window.
Canonical examples: temporal_length(tgeo) for per-window trajectory
length, temporal_twavg(tnumber) for time-weighted average per
window, per-class _trajectory / _time / _timespan accessors.

## Design

Wraps any windowed MeosOps call as a ProcessWindowFunction with a
slim adopter-facing signature: the lambda receives the window
metadata, the iterable of in-window events, and a slim Context
exposing key + processing-time + watermark (free of Flink internals).

Unlike bounded-state, NO MEOS handle persists across window
boundaries — each window's MEOS value is built fresh from the
iterable on window close, used to compute the output, discarded.
The iterable's events are Flink-side data; MEOS handles are
short-lived per-window.

## Files

- MeosWindowedAggregate.java — the generic wiring class
- demo/MeosWindowedDemoJob.java — runnable 30s-tumbling-window
  per-vehicle aggregate-tbox demo (8 events × 2 vehicles × 2 windows;
  demonstrates window-close timing, per-key isolation, fresh-per-
  window aggregation)
- README — windowed row marked ✅ shipped

## Stacks on PR MobilityDB#7

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 140 .class files total (135 from PR MobilityDB#7
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).

(cherry picked from commit ef47708)
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).

(cherry picked from commit 10a03b6)
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).

(cherry picked from commit d322b8c)
@estebanzimanyi estebanzimanyi changed the base branch from main to feat/flink-stateless-tier-wirings June 11, 2026 16:46
@estebanzimanyi

Copy link
Copy Markdown
Member Author

Superseded by the Path-B consolidation: the former 18-deep stack is collapsed into two reviewable topical PRs on top of the merged scaffold — MEOS integration #30 → benchmark #31 — each one clean squashed commit with the generated-facade bulk, dead family-flag profiles, committed target/ artifacts, and invented synthetic corpus removed. Closing as folded into #30/#31.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant