Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion tests/table/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
from pathlib import PosixPath

import pyarrow as pa
Expand All @@ -26,11 +27,13 @@
from pyiceberg.expressions import AlwaysTrue, And, EqualTo, Reference
from pyiceberg.expressions.literals import LongLiteral
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, UpsertResult
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.upsert_util import create_match_filter
from pyiceberg.types import IntegerType, NestedField, StringType, StructType
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import DateType, IntegerType, NestedField, StringType, StructType
from tests.catalog.test_base import InMemoryCatalog


Expand Down Expand Up @@ -714,6 +717,48 @@ def test_upsert_with_nulls(catalog: Catalog) -> None:
)


def test_upsert_updates_existing_row_when_non_join_partition_value_changes(catalog: Catalog) -> None:
identifier = "default.test_upsert_non_join_partition_value_change"
_drop_table(catalog, identifier)

schema = Schema(
NestedField(1, "order_id", IntegerType(), required=True),
NestedField(2, "order_date", DateType(), required=True),
NestedField(3, "order_type", StringType(), required=True),
)
spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="order_date"))
table = catalog.create_table(identifier, schema=schema, partition_spec=spec)

arrow_schema = pa.schema(
[
pa.field("order_id", pa.int32(), nullable=False),
pa.field("order_date", pa.date32(), nullable=False),
pa.field("order_type", pa.string(), nullable=False),
]
)
table.append(
pa.Table.from_pylist(
[{"order_id": 1, "order_date": datetime.date(2026, 1, 1), "order_type": "A"}],
schema=arrow_schema,
)
)

result = table.upsert(
pa.Table.from_pylist(
[{"order_id": 1, "order_date": datetime.date(2026, 1, 2), "order_type": "B"}],
schema=arrow_schema,
),
join_cols=["order_id"],
)

assert result.rows_updated == 1
assert result.rows_inserted == 0
assert table.scan().to_arrow() == pa.Table.from_pylist(
[{"order_id": 1, "order_date": datetime.date(2026, 1, 2), "order_type": "B"}],
schema=arrow_schema,
)


def test_transaction(catalog: Catalog) -> None:
"""Test the upsert within a Transaction. Make sure that if something fails the entire Transaction is
rolled back."""
Expand Down