Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,18 @@ class CoreOptions:
.with_description("Read batch size for any file format if it supports.")
)

READ_PARALLELISM: ConfigOption[int] = (
ConfigOptions.key("read.parallelism")
.int_type()
.default_value(1)
.with_description(
"Parallelism for reading splits within a single TableRead call. "
"The value 1 (default) keeps reads serial. Values >= 2 enable a "
"thread pool that reads splits concurrently and assembles the "
"result in input order. Has no effect when fewer than 2 splits "
"are passed.")
)

ADD_COLUMN_BEFORE_PARTITION: ConfigOption[bool] = (
ConfigOptions.key("add-column-before-partition")
.boolean_type()
Expand Down Expand Up @@ -702,6 +714,9 @@ def local_cache_whitelist(self) -> str:
def read_batch_size(self, default=None) -> int:
return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)

def read_parallelism(self, default=None) -> int:
return self.options.get(CoreOptions.READ_PARALLELISM, default)

def add_column_before_partition(self) -> bool:
return self.options.get(CoreOptions.ADD_COLUMN_BEFORE_PARTITION, False)

Expand Down
215 changes: 211 additions & 4 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, Iterator, List, Optional

import pandas
Expand All @@ -32,6 +34,41 @@
ROW_KIND_COLUMN = "_row_kind"


class _RemainingRows:
"""Thread-safe remaining-rows counter for parallel reads.

Row quota is pre-debited under a single lock so that any rows that
threads commit to emit are guaranteed not to overshoot the limit, even
if individual readers keep decoding one extra batch after the quota is
exhausted.

When ``limit`` is None the counter is unbounded and ``try_consume``
always returns the requested row count.
"""

def __init__(self, limit: Optional[int]):
self._lock = threading.Lock()
self._remaining = limit # None == unlimited

def try_consume(self, requested: int) -> int:
if self._remaining is None:
return requested
if requested <= 0:
return 0
with self._lock:
if self._remaining <= 0:
return 0
allowed = min(requested, self._remaining)
self._remaining -= allowed
return allowed

def exhausted(self) -> bool:
if self._remaining is None:
return False
with self._lock:
return self._remaining <= 0


class TableRead:
"""Implementation of TableRead for native Python reading."""

Expand All @@ -52,6 +89,7 @@ def __init__(
self.include_row_kind = include_row_kind
self.nested_name_paths = nested_name_paths
self.limit = limit
self._read_parallelism = self.table.options.read_parallelism()

def to_iterator(self, splits: List[Split]) -> Iterator:
limit = self.limit
Expand Down Expand Up @@ -104,13 +142,34 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):

return pyarrow.RecordBatch.from_arrays(columns, schema=target_schema)

def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
batch_reader = self.to_arrow_batch_reader(splits)
def to_arrow(
self,
splits: List[Split],
parallelism: Optional[int] = None,
) -> Optional[pyarrow.Table]:
"""Read ``splits`` into a single arrow ``Table``.

Args:
splits: scan-plan splits returned from a ``TableScan``.
parallelism: optional runtime override of the
``read.parallelism`` table option. ``None`` (default) falls
back to the table option; a non-None value temporarily
overrides it for this call. ``1`` keeps reads serial;
``>= 2`` enables a thread pool that reads splits
concurrently and assembles the final table in input order.
Must be ``>= 1``.
"""
# TODO: default read.parallelism to min(splits, cpu_count()) once stable
effective = self._resolve_parallelism(parallelism)
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
if self.include_row_kind:
schema = self._add_row_kind_to_schema(schema)

if self._should_run_parallel(splits, effective):
return self._to_arrow_parallel(splits, schema, effective)

batch_reader = self.to_arrow_batch_reader(splits)

table_list = []
for batch in iter(batch_reader.read_next_batch, None):
if batch.num_rows == 0:
Expand Down Expand Up @@ -183,6 +242,146 @@ def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) ->
finally:
reader.close()

def _resolve_parallelism(self, runtime: Optional[int]) -> int:
"""Pick the effective parallelism and reject illegal values.

Priority: explicit ``parallelism`` argument > ``read.parallelism``
table option > built-in default of 1. The validation message names
whichever source produced the offending value, so users know where
to fix it.
"""
if runtime is not None:
value = runtime
source = "parallelism"
else:
value = self._read_parallelism
source = "read.parallelism"
if value < 1:
raise ValueError(f"{source} must be >= 1, got {value}")
return value

def _should_run_parallel(
self,
splits: List[Split],
effective: int,
) -> bool:
"""Decide whether to take the parallel read path.

``effective == 1`` falls back to the serial path (no thread pool
overhead, no behavior change). A single split is never
parallelized since there is nothing to fan out across.
"""
return effective >= 2 and len(splits) >= 2

def _to_arrow_parallel(
self,
splits: List[Split],
schema: pyarrow.Schema,
effective: int,
) -> pyarrow.Table:
"""Read ``splits`` concurrently and assemble the result in input order.

Each split is read in its own worker thread; row quota for ``limit``
is shared through :class:`_RemainingRows` so the combined output
never exceeds ``self.limit`` rows. Per-split batches are collected
by submission index, so the merged table preserves the order of the
input ``splits`` list.
"""
remaining_state = _RemainingRows(self.limit)
results: List[Optional[List[pyarrow.RecordBatch]]] = [None] * len(splits)
workers = min(effective, len(splits))
with ThreadPoolExecutor(
max_workers=workers,
thread_name_prefix="pypaimon-read",
) as executor:
futures = {
executor.submit(
self._read_one_split_to_batches,
split,
schema,
remaining_state,
): idx
for idx, split in enumerate(splits)
}
for fut in as_completed(futures):
results[futures[fut]] = fut.result()

table_list: List[pyarrow.RecordBatch] = []
for split_batches in results:
if split_batches is None:
continue
for batch in split_batches:
if batch.num_rows == 0:
continue
table_list.append(self._try_to_pad_batch_by_schema(batch, schema))

if not table_list:
return pyarrow.Table.from_arrays(
[pyarrow.array([], type=field.type) for field in schema],
schema=schema,
)
return pyarrow.Table.from_batches(table_list)

def _read_one_split_to_batches(
self,
split: Split,
schema: pyarrow.Schema,
remaining_state: _RemainingRows,
) -> List[pyarrow.RecordBatch]:
"""Read a single split into arrow batches under soft-stop control.

Row quota is debited against the shared ``remaining_state``; once a
request returns 0, the worker stops emitting further batches. The
reader is always closed via ``finally``.
"""
chunk_size = 65536
out: List[pyarrow.RecordBatch] = []
reader = self._create_split_read(split).create_reader()
try:
if isinstance(reader, RecordBatchReader):
for batch in iter(reader.read_arrow_batch, None):
allowed = remaining_state.try_consume(batch.num_rows)
if allowed == 0:
break
if allowed < batch.num_rows:
batch = batch.slice(0, allowed)
if self.include_row_kind:
batch = self._add_row_kind_column_to_batch(batch, "+I")
out.append(batch)
if remaining_state.exhausted():
break
else:
row_tuple_chunk: List[tuple] = []
row_kind_chunk: List[str] = []
stop = False
while not stop:
row_iterator = reader.read_batch()
if row_iterator is None:
break
for row in iter(row_iterator.next, None):
if not isinstance(row, OffsetRow):
raise TypeError(
f"Expected OffsetRow, but got {type(row).__name__}")
if remaining_state.try_consume(1) == 0:
stop = True
break
row_tuple_chunk.append(
row.row_tuple[row.offset: row.offset + row.arity])
if self.include_row_kind:
row_kind_chunk.append(row.get_row_kind().to_string())

if len(row_tuple_chunk) >= chunk_size:
out.append(self._convert_rows_to_arrow_batch_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema))
row_tuple_chunk = []
row_kind_chunk = []
if row_tuple_chunk:
out.append(self._convert_rows_to_arrow_batch_with_row_kind(
row_tuple_chunk, row_kind_chunk, schema))
finally:
reader.close()
return out

def _convert_rows_to_arrow_batch_with_row_kind(
self,
row_tuples: List[tuple],
Expand Down Expand Up @@ -216,8 +415,16 @@ def _add_row_kind_column_to_batch(
columns = [row_kind_array] + [batch.column(i) for i in range(batch.num_columns)]
return pyarrow.RecordBatch.from_arrays(columns, schema=new_schema)

def to_pandas(self, splits: List[Split]) -> pandas.DataFrame:
arrow_table = self.to_arrow(splits)
def to_pandas(
self,
splits: List[Split],
parallelism: Optional[int] = None,
) -> pandas.DataFrame:
"""Read ``splits`` into a pandas ``DataFrame``.

See :meth:`to_arrow` for the semantics of ``parallelism``.
"""
arrow_table = self.to_arrow(splits, parallelism=parallelism)
return arrow_table.to_pandas()

def to_duckdb(self, splits: List[Split], table_name: str,
Expand Down
Loading
Loading