diff --git a/pyproject.toml b/pyproject.toml index 03d98d9c..be05e5b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "c2pa-python" -version = "0.34.0" +version = "0.35.0" requires-python = ">=3.10" description = "Python bindings for the C2PA Content Authenticity Initiative (CAI) library" readme = { file = "README.md", content-type = "text/markdown" } diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index a4fcdef5..ca14469b 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -22,7 +22,7 @@ from pathlib import Path from typing import Optional, Union, Callable, Any, overload import io -from .lib import dynamically_load_library +from .lib import dynamically_load_library, record_owner_pid, is_foreign_process import mimetypes from itertools import count @@ -238,6 +238,7 @@ def __init__(self): self._lifecycle_state = LifecycleState.UNINITIALIZED self._handle = None _clear_error_state() + record_owner_pid(self) @staticmethod def _free_native_ptr(ptr): @@ -280,6 +281,8 @@ def _mark_consumed(self): def _cleanup_resources(self): """Release native resources idempotently.""" try: + if is_foreign_process(self): + return if ( hasattr(self, '_lifecycle_state') and self._lifecycle_state != LifecycleState.CLOSED @@ -1768,6 +1771,7 @@ def flush_callback(ctx): raise Exception("Failed to create stream: {}".format(error)) self._initialized = True + record_owner_pid(self) def __enter__(self): """Context manager entry.""" @@ -1786,6 +1790,8 @@ def __del__(self): hasn't been explicitly closed. """ try: + if is_foreign_process(self): + return # Only cleanup if not already closed and we have a valid stream if hasattr(self, '_closed') and not self._closed: stream = self._stream @@ -1816,6 +1822,10 @@ def close(self): """ if self._closed: return + if is_foreign_process(self): + self._closed = True + self._initialized = False + return try: # Clean up stream first as it depends on callbacks diff --git a/src/c2pa/lib.py b/src/c2pa/lib.py index b1caa104..be6353f8 100644 --- a/src/c2pa/lib.py +++ b/src/c2pa/lib.py @@ -299,3 +299,28 @@ def dynamically_load_library( raise RuntimeError(f"Could not find {c2pa_lib_name} in any of the search paths") return c2pa_lib + + +def record_owner_pid(obj): + """Keep the PID that created this native-handle wrapper + (call from __init__ as needed). + """ + obj._owner_pid = os.getpid() + + +def is_foreign_process(obj): + """Return True when this object is being finalized in a forked child that did not + create it. After a multithreaded fork(), native mutexes may be held by threads + absent in the child -> any lock() call deadlocks. Callers must skip native frees + when this returns True. + + Skipping the free does not cause a cumulative leak. If the child calls exec() the + address space is replaced entirely; if it exits, the OS reclaims all process memory. + Even a long-lived fork child (e.g. a multiprocessing fork-start worker) leaks at most + the objects inherited at fork time — a one-time, bounded amount reclaimed when the + child terminates. Objects the child creates itself carry the child's PID and are + freed normally. + + Defensive default: if _owner_pid was never set, returns False (no regression).""" + owner = getattr(obj, '_owner_pid', None) + return owner is not None and owner != os.getpid() diff --git a/tests/perf/baseline.json b/tests/perf/baseline.json index 3e436967..db034858 100644 --- a/tests/perf/baseline.json +++ b/tests/perf/baseline.json @@ -2,154 +2,189 @@ "_meta": { "memray_version": "1.19.3", "python_version": "3.12.13", - "c2pa_native_version": "c2pa-v0.86.1", - "iterations": 100, + "c2pa_native_version": "c2pa-v0.88.0", + "iterations": 250, "perf_env": "python-3.12-slim", "arch": "aarch64" }, "reader_jpeg_legacy": { - "peak_bytes": 3730321, - "leaked_bytes": 3236992, - "total_allocations": 717596 + "peak_bytes": 3762136, + "leaked_bytes": 3262031, + "total_allocations": 1660100 }, "reader_jpeg_with_context": { - "peak_bytes": 3724412, - "leaked_bytes": 3229219, - "total_allocations": 711543 + "peak_bytes": 3756118, + "leaked_bytes": 3254069, + "total_allocations": 1646550 }, "reader_mp4": { - "peak_bytes": 4099225, - "leaked_bytes": 3228160, - "total_allocations": 2084373 + "peak_bytes": 4208976, + "leaked_bytes": 3253426, + "total_allocations": 4963836 }, "reader_wav": { - "peak_bytes": 4399719, - "leaked_bytes": 3238102, - "total_allocations": 408059 + "peak_bytes": 4431268, + "leaked_bytes": 3263354, + "total_allocations": 888612 }, "builder_sign_jpeg_legacy": { - "peak_bytes": 7663441, - "leaked_bytes": 3352658, - "total_allocations": 555922 + "peak_bytes": 7694334, + "leaked_bytes": 3377383, + "total_allocations": 1250156 }, "builder_sign_jpeg_with_context": { - "peak_bytes": 7656560, - "leaked_bytes": 3345863, - "total_allocations": 550105 + "peak_bytes": 7687683, + "leaked_bytes": 3370546, + "total_allocations": 1235555 }, "builder_sign_png_legacy": { - "peak_bytes": 7900956, - "leaked_bytes": 3351994, - "total_allocations": 1978914 + "peak_bytes": 7933540, + "leaked_bytes": 3377209, + "total_allocations": 4502325 }, "builder_sign_png_with_context": { - "peak_bytes": 7893973, - "leaked_bytes": 3345450, - "total_allocations": 1973003 + "peak_bytes": 7924904, + "leaked_bytes": 3370312, + "total_allocations": 4487743 }, "builder_sign_jpeg_parallel_split_pool": { - "peak_bytes": 45726143, - "leaked_bytes": 3714796, - "total_allocations": 557891 + "peak_bytes": 45790186, + "leaked_bytes": 3770876, + "total_allocations": 1247400 }, "builder_sign_jpeg_parallel_split_barrier": { - "peak_bytes": 45817488, - "leaked_bytes": 3780629, - "total_allocations": 627768 + "peak_bytes": 45758269, + "leaked_bytes": 3769617, + "total_allocations": 1246063 }, "builder_sign_png_parallel_split_pool": { - "peak_bytes": 40563556, - "leaked_bytes": 3746819, - "total_allocations": 1984928 + "peak_bytes": 42884934, + "leaked_bytes": 3806845, + "total_allocations": 4499456 }, "builder_sign_png_parallel_split_barrier": { - "peak_bytes": 45964496, - "leaked_bytes": 3745249, - "total_allocations": 1983686 + "peak_bytes": 45995848, + "leaked_bytes": 3805431, + "total_allocations": 4498202 }, "builder_sign_gif": { - "peak_bytes": 14514114, - "leaked_bytes": 3345545, - "total_allocations": 8547131 + "peak_bytes": 14545704, + "leaked_bytes": 3369959, + "total_allocations": 19592169 }, "builder_sign_heic": { - "peak_bytes": 7717414, - "leaked_bytes": 3381771, - "total_allocations": 877927 + "peak_bytes": 4609675, + "leaked_bytes": 3369960, + "total_allocations": 1865279 }, "builder_sign_m4a": { - "peak_bytes": 18817771, - "leaked_bytes": 3345503, - "total_allocations": 2627261 + "peak_bytes": 18848657, + "leaked_bytes": 3369911, + "total_allocations": 6143845 }, "builder_sign_webp": { - "peak_bytes": 8869451, - "leaked_bytes": 3345563, - "total_allocations": 496534 + "peak_bytes": 8901476, + "leaked_bytes": 3369960, + "total_allocations": 1108971 }, "builder_sign_avi": { - "peak_bytes": 7009007, - "leaked_bytes": 3345266, - "total_allocations": 45029611 + "peak_bytes": 7041162, + "leaked_bytes": 3369959, + "total_allocations": 105383089 }, "builder_sign_mp4": { - "peak_bytes": 6131977, - "leaked_bytes": 3345600, - "total_allocations": 1923444 + "peak_bytes": 6163626, + "leaked_bytes": 3369959, + "total_allocations": 4502723 }, "builder_sign_tiff": { - "peak_bytes": 13091168, - "leaked_bytes": 3345348, - "total_allocations": 5469122 + "peak_bytes": 13123408, + "leaked_bytes": 3369960, + "total_allocations": 13221796 }, "builder_sign_jpeg_parent_of": { - "peak_bytes": 14143351, - "leaked_bytes": 3345698, - "total_allocations": 1285766 + "peak_bytes": 14175499, + "leaked_bytes": 3370412, + "total_allocations": 3049271 }, "builder_sign_jpeg_component_of": { - "peak_bytes": 14144869, - "leaked_bytes": 3345779, - "total_allocations": 1308244 + "peak_bytes": 14176177, + "leaked_bytes": 3370939, + "total_allocations": 3105680 }, "builder_sign_jpeg_parent_and_component": { - "peak_bytes": 14434957, - "leaked_bytes": 3450621, - "total_allocations": 2289962 + "peak_bytes": 14521625, + "leaked_bytes": 3466959, + "total_allocations": 5528187 }, "builder_sign_jpeg_parent_and_component_mixed_mime": { - "peak_bytes": 14445750, - "leaked_bytes": 3345959, - "total_allocations": 2787986 + "peak_bytes": 14476222, + "leaked_bytes": 3370421, + "total_allocations": 6500129 }, "builder_sign_jpeg_two_components_same_mime": { - "peak_bytes": 14432257, - "leaked_bytes": 3442393, - "total_allocations": 2279745 + "peak_bytes": 14584523, + "leaked_bytes": 3506901, + "total_allocations": 5502714 }, "builder_sign_jpeg_two_components_mixed_mime": { - "peak_bytes": 14443122, - "leaked_bytes": 3346165, - "total_allocations": 2777653 + "peak_bytes": 14473585, + "leaked_bytes": 3370669, + "total_allocations": 6474343 }, "builder_sign_jpeg_archive_roundtrip": { - "peak_bytes": 14175766, - "leaked_bytes": 3365189, - "total_allocations": 1767740 + "peak_bytes": 14206327, + "leaked_bytes": 3389587, + "total_allocations": 4247160 }, "reader_error_no_manifest": { - "peak_bytes": 3443163, - "leaked_bytes": 3207515, - "total_allocations": 173242 + "peak_bytes": 3474039, + "leaked_bytes": 3232411, + "total_allocations": 303795 }, "builder_error_invalid_manifest": { - "peak_bytes": 3243627, - "leaked_bytes": 3186717, - "total_allocations": 95461 + "peak_bytes": 3271093, + "leaked_bytes": 3211670, + "total_allocations": 120613 }, "reader_string_apis": { - "peak_bytes": 3857136, - "leaked_bytes": 3229581, - "total_allocations": 1178409 + "peak_bytes": 3888863, + "leaked_bytes": 3254426, + "total_allocations": 2806719 + }, + "fork_reader_collect": { + "peak_bytes": 3760272, + "leaked_bytes": 3261839, + "total_allocations": 1615850 + }, + "fork_contended_mutex": { + "peak_bytes": 7585897, + "leaked_bytes": 3392170, + "total_allocations": 82616370 + }, + "fork_thread_local_orphan": { + "peak_bytes": 3845601, + "leaked_bytes": 3348583, + "total_allocations": 1681204 + }, + "fork_gc_cycle": { + "peak_bytes": 3760522, + "leaked_bytes": 3261588, + "total_allocations": 1620079 + }, + "fork_parent_frees_after_fork": { + "peak_bytes": 5959081, + "leaked_bytes": 3260442, + "total_allocations": 30560082 + }, + "fork_child_sys_exit": { + "peak_bytes": 3760808, + "leaked_bytes": 3261849, + "total_allocations": 1624602 + }, + "fork_stream_cleanup": { + "peak_bytes": 3382176, + "leaked_bytes": 3210242, + "total_allocations": 111383 } } \ No newline at end of file diff --git a/tests/perf/scenarios.py b/tests/perf/scenarios.py index e2945294..7598c4b9 100644 --- a/tests/perf/scenarios.py +++ b/tests/perf/scenarios.py @@ -9,6 +9,7 @@ Each function is called N times by run_profile.py. """ +import gc import io import json import os @@ -16,6 +17,7 @@ import threading from concurrent.futures import ThreadPoolExecutor from pathlib import Path +from types import SimpleNamespace from c2pa import ( Builder, C2paError, @@ -23,6 +25,7 @@ Context, Reader, Signer, + Stream, ) FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" @@ -545,6 +548,183 @@ def scenario_builder_sign_png_parallel_split_barrier(iterations: int = 100) -> N _sign_parallel(SIGNING_PNG, "image/png", iterations, per_thread_full=False, launch="barrier") +def _fork_wait(child_fn) -> None: + """Fork; run child_fn() in child then _exit(0); parent waits up to 5 s.""" + import signal + + def _on_alarm(signum, frame): + raise TimeoutError("fork child deadlocked — 5 s alarm fired") + + pid = os.fork() + if pid == 0: + child_fn() + os._exit(0) + + old = signal.signal(signal.SIGALRM, _on_alarm) + try: + signal.alarm(5) + _, status = os.waitpid(pid, 0) + signal.alarm(0) + finally: + signal.signal(signal.SIGALRM, old) + assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0, ( + f"child exited abnormally: status={status}" + ) + + +def scenario_fork_reader_collect(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + Baseline: create Reader, fork, child gc.collect() + _exit, parent closes. + Guard fires in child (no deadlock); parent frees normally (no leak). + """ + if not hasattr(os, "fork"): + return + for _ in _iterate(iterations): + with open(SIGNED_JPEG, "rb") as f: + reader = Reader("image/jpeg", f) + _fork_wait(lambda: gc.collect()) + reader.close() + + +def scenario_fork_contended_mutex(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + 8 threads create/close Readers in a tight loop while the main thread + forks 5× per iteration (500 total forks). Maximises the probability that + the registry Mutex is held at the instant of fork(). Each fork inherits + a Reader created by the main thread; the child explicitly closes it + (then runs GC), so the PID guard is exercised on every fork — without + the guard the close would call into the native library and could + deadlock on a mutex left locked by a vanished worker thread. The parent + closes the same Reader after the child exits (its own PID: real free). + + Note: the workers' Readers are pinned by frozen thread frames in the + child, so child gc.collect() alone would free nothing — hence the + explicit close of an inherited object. + """ + if not hasattr(os, "fork"): + return + stop = threading.Event() + + def _worker(): + while not stop.is_set(): + with open(SIGNED_JPEG, "rb") as f: + r = Reader("image/jpeg", f) + r.close() + + threads = [threading.Thread(target=_worker, daemon=True) + for _ in range(8)] + for t in threads: + t.start() + try: + for _ in _iterate(iterations): + for _ in range(5): + with open(SIGNED_JPEG, "rb") as f: + reader = Reader("image/jpeg", f) + + def _child(r=reader): + r.close() + gc.collect() + + _fork_wait(_child) + reader.close() + finally: + stop.set() + for t in threads: + t.join(timeout=5) + + +def scenario_fork_thread_local_orphan(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + A thread stores Reader in threading.local, joins, then main forks. + """ + if not hasattr(os, "fork"): + return + for _ in _iterate(iterations): + tl = threading.local() + + def _create(): + with open(SIGNED_JPEG, "rb") as f: + tl.reader = Reader("image/jpeg", f) + + t = threading.Thread(target=_create) + t.start() + t.join() + _fork_wait(lambda: gc.collect()) + + +def scenario_fork_gc_cycle(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + Reader in a reference cycle, freed only by cyclic GC, not refcounting. + Child calls gc.collect(), which triggers __del__ on the Reader. + """ + if not hasattr(os, "fork"): + return + for _ in _iterate(iterations): + with open(SIGNED_JPEG, "rb") as f: + reader = Reader("image/jpeg", f) + container = SimpleNamespace(reader=reader) + reader.container = container # cycle: reader ↔ container + del reader, container # refcount > 0; cycle survives until GC + + _fork_wait(lambda: gc.collect()) + gc.collect() # parent cleans up + + +def scenario_fork_parent_frees_after_fork(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + 20 Readers created, fork, child exits immediately, parent closes all 20. + Primary false-positive test: if is_foreign_process() wrongly fires in the + parent, all 20 native frees are skipped and leaked_bytes spikes ~20x. + """ + if not hasattr(os, "fork"): + return + for _ in _iterate(iterations): + readers = [] + for _ in range(20): + with open(SIGNED_JPEG, "rb") as f: + readers.append(Reader("image/jpeg", f)) + _fork_wait(lambda: None) # child does nothing, exits 0 + for r in readers: + r.close() + + +def scenario_fork_child_sys_exit(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + Child calls sys.exit(0), full Python shutdown: atexit, finalizers, GC. + Every native-handle wrapper's __del__ fires in the child. Guard must + survive Py_Finalize() without deadlocking. + """ + if not hasattr(os, "fork"): + return + for _ in _iterate(iterations): + with open(SIGNED_JPEG, "rb") as f: + reader = Reader("image/jpeg", f) + context = Context() + + def _child(): + import sys as _sys + _sys.exit(0) # full Python shutdown, not _exit + + _fork_wait(_child) + reader.close() + context.close() + + +def scenario_fork_stream_cleanup(iterations: int = 100) -> None: + """Fork safety benchmark scenario: + Stream wraps a BytesIO with ctypes callbacks stored as instance attributes. + Both Stream.__del__ and Stream.close carry fork guards. This tests the + stream-specific path (separate from ManagedResource). + """ + if not hasattr(os, "fork"): + return + source_bytes = SIGNED_JPEG.read_bytes() + for _ in _iterate(iterations): + stream = Stream(io.BytesIO(source_bytes)) + _fork_wait(lambda: gc.collect()) + stream.close() + + SCENARIOS = { "reader_jpeg_legacy": scenario_reader_jpeg_legacy, "reader_jpeg_with_context": scenario_reader_jpeg_with_context, @@ -575,6 +755,13 @@ def scenario_builder_sign_png_parallel_split_barrier(iterations: int = 100) -> N "reader_error_no_manifest": scenario_reader_error_no_manifest, "builder_error_invalid_manifest": scenario_builder_error_invalid_manifest, "reader_string_apis": scenario_reader_string_apis, + "fork_reader_collect": scenario_fork_reader_collect, + "fork_contended_mutex": scenario_fork_contended_mutex, + "fork_thread_local_orphan": scenario_fork_thread_local_orphan, + "fork_gc_cycle": scenario_fork_gc_cycle, + "fork_parent_frees_after_fork": scenario_fork_parent_frees_after_fork, + "fork_child_sys_exit": scenario_fork_child_sys_exit, + "fork_stream_cleanup": scenario_fork_stream_cleanup, } diff --git a/tests/test_unit_tests_threaded.py b/tests/test_unit_tests_threaded.py index 8eaacf88..efb7ba27 100644 --- a/tests/test_unit_tests_threaded.py +++ b/tests/test_unit_tests_threaded.py @@ -11,6 +11,7 @@ # specific language governing permissions and limitations under # each license. +import ctypes import os import io import json @@ -20,13 +21,172 @@ import time import asyncio import random +from unittest.mock import MagicMock, patch from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version # noqa: E501 from c2pa import Context, Settings -from c2pa.c2pa import Stream +from c2pa.c2pa import ManagedResource, Stream, LifecycleState +from c2pa.lib import is_foreign_process, record_owner_pid PROJECT_PATH = os.getcwd() FIXTURES_FOLDER = os.path.join(os.path.dirname(__file__), "fixtures") + + +class _ConcreteResource(ManagedResource): + """Minimal concrete subclass for testing ManagedResource cleanup.""" + + +def _make_resource(pid_offset): + """Construct a ManagedResource-like object without triggering native init. + + pid_offset=1 → simulates a forked child (foreign PID) + pid_offset=0 → same process (normal cleanup) + pid_offset=None → no _owner_pid stamp (backward-compat: no protection) + """ + obj = object.__new__(_ConcreteResource) + obj._lifecycle_state = LifecycleState.ACTIVE + obj._handle = ctypes.c_void_p(1) # non-None, non-zero sentinel + if pid_offset is not None: + obj._owner_pid = os.getpid() + pid_offset + return obj + + +def _make_stream(pid_offset): + """Construct a Stream-like object without triggering native init.""" + obj = object.__new__(Stream) + obj._closed = False + obj._initialized = True + obj._stream = MagicMock() # non-None stream handle + if pid_offset is not None: + obj._owner_pid = os.getpid() + pid_offset + return obj + + +class TestManagedResourceForkGuard(unittest.TestCase): + """Fork-safety unit tests for ManagedResource and Stream. + + Verifies that the is_foreign_process() PID guard prevents native frees + from running in a forked child process (where native mutexes may be held + by threads that no longer exist, causing deadlock before exec()). + + No real fork or auth credentials are required; PID mismatch is simulated + by setting _owner_pid = os.getpid() + 1. + """ + + def test_foreign_pid_skips_free(self): + """In a forked child (pid_offset=1), no native free should run.""" + obj = _make_resource(pid_offset=1) + with patch('c2pa.c2pa._lib') as mock_lib: + obj._cleanup_resources() + mock_lib.c2pa_free.assert_not_called() + + def test_own_pid_calls_free(self): + """In the owning process, cleanup must call c2pa_free normally.""" + obj = _make_resource(pid_offset=0) + expected_handle = obj._handle + with patch('c2pa.c2pa._lib'): + with patch.object(ManagedResource, '_free_native_ptr') as mock_free: + obj._cleanup_resources() + mock_free.assert_called_once_with(expected_handle) + + def test_no_stamp_calls_free(self): + """No _owner_pid (backward-compat) must NOT suppress cleanup.""" + obj = _make_resource(pid_offset=None) + with patch.object(ManagedResource, '_free_native_ptr') as mock_free: + obj._cleanup_resources() + mock_free.assert_called_once() + + def test_foreign_pid_leaves_state_unchanged(self): + """Guard returns early; lifecycle state stays ACTIVE (not CLOSED).""" + obj = _make_resource(pid_offset=1) + with patch('c2pa.c2pa._lib'): + obj._cleanup_resources() + self.assertEqual(obj._lifecycle_state, LifecycleState.ACTIVE) + + def test_double_cleanup_is_idempotent(self): + """Second call is a no-op after successful first cleanup.""" + obj = _make_resource(pid_offset=0) + with patch.object(ManagedResource, '_free_native_ptr') as mock_free: + obj._cleanup_resources() + obj._cleanup_resources() + mock_free.assert_called_once() + + def test_foreign_pid_skips_release_via_del(self): + obj = _make_stream(pid_offset=1) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.__del__() + mock_lib.c2pa_release_stream.assert_not_called() + + def test_own_pid_releases_stream_via_del(self): + obj = _make_stream(pid_offset=0) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.__del__() + mock_lib.c2pa_release_stream.assert_called_once() + + def test_no_stamp_releases_stream_via_del(self): + obj = _make_stream(pid_offset=None) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.__del__() + mock_lib.c2pa_release_stream.assert_called_once() + + def test_already_closed_is_noop_via_del(self): + obj = _make_stream(pid_offset=0) + obj._closed = True + with patch('c2pa.c2pa._lib') as mock_lib: + obj.__del__() + mock_lib.c2pa_release_stream.assert_not_called() + + def test_foreign_pid_skips_release_via_close(self): + obj = _make_stream(pid_offset=1) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.close() + mock_lib.c2pa_release_stream.assert_not_called() + self.assertTrue(obj._closed) + + def test_own_pid_releases_stream_via_close(self): + obj = _make_stream(pid_offset=0) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.close() + mock_lib.c2pa_release_stream.assert_called_once() + + def test_no_stamp_releases_stream_via_close(self): + obj = _make_stream(pid_offset=None) + with patch('c2pa.c2pa._lib') as mock_lib: + obj.close() + mock_lib.c2pa_release_stream.assert_called_once() + + def test_already_closed_is_noop_via_close(self): + obj = _make_stream(pid_offset=0) + obj._closed = True + with patch('c2pa.c2pa._lib') as mock_lib: + obj.close() + mock_lib.c2pa_release_stream.assert_not_called() + + def test_foreign_pid_close_marks_closed(self): + """close() in forked child must set _closed=True to prevent re-entry, + and _initialized=False so the public properties report a closed stream.""" + obj = _make_stream(pid_offset=1) + with patch('c2pa.c2pa._lib'): + obj.close() + self.assertTrue(obj._closed) + self.assertFalse(obj._initialized) + + +class TestHelpers(unittest.TestCase): + + def test_record_and_detect_own_pid(self): + obj = MagicMock() + record_owner_pid(obj) + self.assertFalse(is_foreign_process(obj)) + + def test_detect_foreign_pid(self): + obj = MagicMock() + obj._owner_pid = os.getpid() + 1 + self.assertTrue(is_foreign_process(obj)) + + def test_no_stamp_not_foreign(self): + obj = MagicMock(spec=[]) # no _owner_pid attribute + self.assertFalse(is_foreign_process(obj)) DEFAULT_TEST_FILE = os.path.join(FIXTURES_FOLDER, "C.jpg") INGREDIENT_TEST_FILE = os.path.join(FIXTURES_FOLDER, "A.jpg") ALTERNATIVE_INGREDIENT_TEST_FILE = os.path.join(FIXTURES_FOLDER, "cloud.jpg")