diff --git a/sqlite_utils/db.py b/sqlite_utils/db.py index ed3fc7af2..08f5d3481 100644 --- a/sqlite_utils/db.py +++ b/sqlite_utils/db.py @@ -3604,18 +3604,42 @@ def insert_all( if num_records_processed == 1: # For an insert we need to use result.lastrowid if not upsert and result is not None: - self.last_rowid = result.lastrowid - if (hash_id or pk) and self.last_rowid: - # Set self.last_pk to the pk(s) for that rowid - row = list(self.rows_where("rowid = ?", [self.last_rowid]))[0] - if hash_id: - self.last_pk = row[hash_id] - elif isinstance(pk, str): - self.last_pk = row[pk] + ignored_insert = ignore and result.rowcount == 0 + if ignored_insert: + if list_mode: + first_record_list = cast(Sequence[Any], first_record) + if hash_id: + pass + elif isinstance(pk, str): + pk_index = column_names.index(pk) + self.last_pk = first_record_list[pk_index] + elif pk: + self.last_pk = tuple( + first_record_list[column_names.index(p)] for p in pk + ) else: - self.last_pk = tuple(row[p] for p in pk) + first_record_dict = cast(Dict[str, Any], first_record) + if hash_id: + self.last_pk = hash_record( + first_record_dict, hash_id_columns + ) + elif isinstance(pk, str): + self.last_pk = first_record_dict[pk] + elif pk: + self.last_pk = tuple(first_record_dict[p] for p in pk) else: - self.last_pk = self.last_rowid + self.last_rowid = result.lastrowid + if (hash_id or pk) and self.last_rowid: + # Set self.last_pk to the pk(s) for that rowid + row = list(self.rows_where("rowid = ?", [self.last_rowid]))[0] + if hash_id: + self.last_pk = row[hash_id] + elif isinstance(pk, str): + self.last_pk = row[pk] + else: + self.last_pk = tuple(row[p] for p in pk) + else: + self.last_pk = self.last_rowid else: # For an upsert use first_record from earlier if list_mode: diff --git a/tests/test_create.py b/tests/test_create.py index b1a6ad1f8..6f4338886 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -937,6 +937,23 @@ def test_insert_ignore(fresh_db): assert rows == [{"id": 1, "bar": 2}] +def test_insert_ignore_with_pk_after_other_table_insert(fresh_db): + user = {"id": "abc", "name": "david"} + + fresh_db["users"].insert(user, pk="id") + fresh_db["comments"].insert_all( + [ + {"id": "def", "text": "ok"}, + {"id": "ghi", "text": "great"}, + ], + ) + + table = fresh_db["users"].insert(user, pk="id", ignore=True) + + assert table.last_pk == "abc" + assert list(fresh_db["users"].rows) == [user] + + def test_insert_hash_id(fresh_db): dogs = fresh_db["dogs"] id = dogs.insert({"name": "Cleo", "twitter": "cleopaws"}, hash_id="id").last_pk