Skip to content

feat(wirings): cross-stream tier DataStream wiring + demo — completes the 4-tier matrix (stacks on #8)#9

Closed
estebanzimanyi wants to merge 1 commit into
MobilityDB:feat/flink-windowed-tier-wiringsfrom
estebanzimanyi:feat/flink-cross-stream-tier-wirings
Closed

feat(wirings): cross-stream tier DataStream wiring + demo — completes the 4-tier matrix (stacks on #8)#9
estebanzimanyi wants to merge 1 commit into
MobilityDB:feat/flink-windowed-tier-wiringsfrom
estebanzimanyi:feat/flink-cross-stream-tier-wirings

Conversation

@estebanzimanyi

Copy link
Copy Markdown
Member

Fourth and final tier-specific wiring follow-up on PR #5's generated MEOS facade. Stacks on PR #8 (windowed). Adds the cross-stream tier.

With this PR, the 4-tier wiring matrix is complete — every streamable tier in the v4 baseline has a generic wiring class.

What's in this PR

File Purpose
MeosCrossStreamJoin<L, R, OUT> Generic ProcessJoinFunction (the operator backing KeyedStream.intervalJoin(other)); time-bounded match window; same-key pairing
demo/MeosCrossStreamDemoJob Runnable interval-join demo matching two streams on shared regionId key within ±1 minute; emits per-pair overlap events
README.md cross-stream row marked ✅ shipped

Design — pre-keyed interval-join, watermark-driven matches

Cross-stream is "pairwise across two pre-keyed streams within a bounded time window". Both streams must be keyBy(K)-ed by the same K; only events sharing a key are considered for pairing. The .between(lowerBound, upperBound) declaration bounds the time window; matches fire event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike pattern used in MeosWindowedAggregate:

.intervalJoin(other)
.between(Time.minutes(-1), Time.minutes(1))
.process(new MeosCrossStreamJoin<L, R, OUT>(
    (left, right, ctx) -> {
        Pointer leftT  = left.toTGeoPointer();
        Pointer rightT = right.toTGeoPointer();
        if (MeosOpsTGeo.edwithin_tgeo_tgeo(leftT, rightT, 100.0) != 0) {
            return new MeetingEvent(left.id(), right.id(), ctx.getLeftTimestamp());
        }
        return null;  // no output for non-matches
    }));

Final tier coverage after this PR

Tier Method count (v4 baseline) Wirable through
stateless 804 MeosStatelessMap / MeosStatelessFilter (PR #6)
bounded-state 797 MeosBoundedStateMap (PR #7)
windowed 161 MeosWindowedAggregate (PR #8)
cross-stream 140 MeosCrossStreamJoin (this PR)
io-meta 195 covered transitively by MeosStatelessMap
sequence-only 14 n/a — inherently non-streamable
Total wirable 2,097 / 2,097 = 100% 5 generic classes

No per-method registration; adopters provide a serializable lambda per use site.

Demo highlights

MeosCrossStreamDemoJob is a 6-event × 2-region cross-stream pipeline:

  • Two parallel streams (A, B) keyed by regionId
  • .intervalJoin(other).between(-1m, +1m) pairs A-events with B-events in the same region within ±1 minute
  • Per matched pair: test overlap via MeosOpsFreeCore.overlaps_tbox_tbox, emit (regionId, vehAId, vehBId, leftTs, rightTs) on match, skip on non-match
  • Proves: interval-join semantics, per-key isolation, pairwise MEOS call from a matched pair

Stacking

This PR stacks on feat/flink-windowed-tier-wirings (PR #8). Additive-only: 3 file changes (1 new wiring class + 1 new demo + 1 README row marked ✅).

Compile verification

Locally green: 145 .class files total (140 from PR #8 base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1 anonymous ContextLike + 1 demo class).

What the 4-PR chain delivers cumulatively

Layer Files Wirings package methods Tier coverage
PR #5 codegen 57 2,097 (generated facades) n/a
PR #6 stateless wirings +4 +2 wirings 804 + 195 io-meta
PR #7 bounded-state wiring +3 +1 wiring +797
PR #8 windowed wiring +3 +1 wiring +161
PR #9 cross-stream wiring (this) +3 +1 wiring +140
End state 70 total 5 wirings covering 100% of streamable surface 2,097 / 2,097

@estebanzimanyi estebanzimanyi force-pushed the feat/flink-cross-stream-tier-wirings branch from d1f229f to 10a03b6 Compare May 29, 2026 12:42
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 29, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).
…ompletes the 4-tier matrix)

Adds MeosCrossStreamJoin<L, R, OUT> — the fourth and final
tier-wiring class in the org.mobilitydb.flink.meos.wirings package,
stacked on PR MobilityDB#8 (windowed wirings).

Cross-stream is the smallest streamable tier (140 of 2,097 emitted
methods, ~7%) — pairwise across two pre-keyed streams, time-bounded
match window. Canonical examples: spatial-relations between two
trajectories (edwithin_tgeo_tgeo, eintersects_tgeo_tgeo), distance
on two temporals (nad_tgeo_tgeo, mindistance_tgeo_tgeo).

## Design

Wraps any cross-stream MeosOps call as a ProcessJoinFunction — the
operator backing KeyedStream.intervalJoin(other). Both streams must
be pre-keyed by the same K; only events sharing a key are considered
for pairing. The .between(lowerBound, upperBound) declaration bounds
the time window for match-eligibility, and matches are emitted
event-time-aware (watermark-driven).

The adopter-facing signature keeps the slim ContextLike-pattern used
in MeosWindowedAggregate: the lambda receives the matched (left,
right) pair and a slim Context exposing left/right timestamps (the
bits a MEOS cross-stream call typically needs), free of Flink
internals.

## Files

- MeosCrossStreamJoin.java — the generic wiring class
- demo/MeosCrossStreamDemoJob.java — runnable interval-join demo
  matching two streams of (regionId, vehicleId, tboxWKT, ts) on
  shared regionId key within ±1 minute; emits per-pair overlap
  events via MeosOpsFreeCore.overlaps_tbox_tbox
- README — cross-stream row marked ✅ shipped

## Completes the 4-tier wiring matrix

After this PR, every streamable tier in the v4 baseline has a
generic wiring class in this package:

  stateless        804 methods   →  MeosStatelessMap / MeosStatelessFilter (PR MobilityDB#6)
  bounded-state    797 methods   →  MeosBoundedStateMap (PR MobilityDB#7)
  windowed         161 methods   →  MeosWindowedAggregate (PR MobilityDB#8)
  cross-stream     140 methods   →  MeosCrossStreamJoin (THIS PR)
  io-meta          195 methods   →  covered by MeosStatelessMap
  sequence-only     14 methods   →  inherently non-streamable

Total: 2,097 of 2,097 = 100% of streamable + io-meta generated
MeosOps* methods are wirable through 4 (+ 1 filter sibling) generic
classes; no per-method registration; adopters provide a serializable
lambda per use site.

## Stacks on PR MobilityDB#8

Additive-only; touches no existing file beyond the README row.
Locally compile-verified: 145 .class files total (140 from PR MobilityDB#8
base + 5 new — 1 wiring class + 2 nested lambda interfaces + 1
anonymous ContextLike + 1 demo class).

(cherry picked from commit 10a03b6)
@estebanzimanyi estebanzimanyi force-pushed the feat/flink-cross-stream-tier-wirings branch from 10a03b6 to d71d460 Compare May 31, 2026 07:49
estebanzimanyi added a commit to estebanzimanyi/MobilityFlink that referenced this pull request May 31, 2026
…peline

Adds MeosAllTiersCapstoneDemo — a single Flink DataStream job that
exercises all four tier-wiring classes from the PR MobilityDB#6MobilityDB#7MobilityDB#8MobilityDB#9 stack
in a coherent end-to-end pipeline.

Pipeline (each stage uses one tier-wiring class from the stack):

  ① MeosStatelessFilter    — drop events outside regions of interest
  ② MeosBoundedStateMap    — per-vehicle running tbox union (byte[] state)
  ③ MeosWindowedAggregate  — per-vehicle 30s tumbling tbox summary
  ④ MeosCrossStreamJoin    — interval-join vehicle aggregates against
                             region queries (±1m bound, regionId key)

The pipeline answers: 'for each region, which vehicles had an aggregate
trajectory (running union) overlapping the region's query bbox during
the latest 30-second window?'

Proves the wirings compose into a realistic pipeline shape (not just
work in isolation), each tier delivering its specific contract:
stateless filter is per-event, bounded-state persists handle state
across events as bytes, windowed aggregates window-close-only, cross-
stream interval-joins on shared key.

Stacks on PR MobilityDB#9; additive-only (1 new demo file). Locally compile-
verified: 146 .class files total (145 from PR MobilityDB#9 base + 1 new demo).

(cherry picked from commit d322b8c)
@estebanzimanyi estebanzimanyi changed the base branch from main to feat/flink-windowed-tier-wirings June 11, 2026 16:46
@estebanzimanyi

Copy link
Copy Markdown
Member Author

Superseded by the Path-B consolidation: the former 18-deep stack is collapsed into two reviewable topical PRs on top of the merged scaffold — MEOS integration #30 → benchmark #31 — each one clean squashed commit with the generated-facade bulk, dead family-flag profiles, committed target/ artifacts, and invented synthetic corpus removed. Closing as folded into #30/#31.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant