fix(resource_manager): reinitialize consumer threads after os.fork()#1658
fix(resource_manager): reinitialize consumer threads after os.fork()#1658pyg410 wants to merge 2 commits into
Conversation
Signed-off-by: 박영규 <pyg410@naver.com>
| def _at_fork_reinit(self) -> None: | ||
| """Reinitialize consumer threads after fork in child process. | ||
|
|
||
| langfuse_logger.info( | ||
| f"Startup: Langfuse tracer successfully initialized | " | ||
| f"public_key={self.public_key} | " | ||
| f"base_url={base_url} | " | ||
| f"environment={environment or 'default'} | " | ||
| f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " | ||
| f"media_threads={media_upload_thread_count or 1}" | ||
| Called automatically via os.register_at_fork() after fork(). | ||
| Necessary for Gunicorn --preload deployments where os.fork() is used: | ||
| threads are not copied to child processes (POSIX standard), so without | ||
| reinitialization, the child process has no consumer threads and all | ||
| media upload and score ingestion events are silently lost. | ||
|
|
||
| Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export | ||
| fork-safety separately via its own os.register_at_fork handler. | ||
|
|
||
| Skipped if shutdown() was already called on this instance, to avoid | ||
| restarting threads on an intentionally torn-down manager. | ||
| """ | ||
| if self._shutdown: | ||
| return | ||
|
|
||
| langfuse_logger.debug( | ||
| f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." | ||
| ) | ||
|
|
||
| # Queues are intentionally recreated after fork. Items enqueued before fork | ||
| # belong to the preloaded parent process and must not be processed by every | ||
| # worker — otherwise uploads/scores would be duplicated across workers. | ||
| # | ||
| # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated | ||
| # here to keep this handler minimal; this mirrors the existing singleton client | ||
| # lifecycle. If preload-time network I/O is introduced in the future, clients | ||
| # may need fork-specific reinitialization as well. | ||
| self._init_consumer_threads() | ||
|
|
||
| langfuse_logger.debug( | ||
| f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" | ||
| ) |
There was a problem hiding this comment.
Unhandled exception in
after_in_child callback crashes Gunicorn worker
_at_fork_reinit calls _init_consumer_threads, which calls Thread.start(). If the OS refuses to create a thread — e.g., due to resource exhaustion (OSError: can't start new thread) — the exception propagates through the after_in_child callback chain and surfaces as an exception from os.fork() in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping _init_consumer_threads() in a try/except Exception and logging the error would allow the child to continue (without consumer threads) instead of crashing.
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/resource_manager.py
Line: 344-378
Comment:
**Unhandled exception in `after_in_child` callback crashes Gunicorn worker**
`_at_fork_reinit` calls `_init_consumer_threads`, which calls `Thread.start()`. If the OS refuses to create a thread — e.g., due to resource exhaustion (`OSError: can't start new thread`) — the exception propagates through the `after_in_child` callback chain and surfaces as an exception from `os.fork()` in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping `_init_consumer_threads()` in a `try/except Exception` and logging the error would allow the child to continue (without consumer threads) instead of crashing.
How can I resolve this? If you propose a fix, please make it concise.| # Queues are intentionally recreated after fork. Items enqueued before fork | ||
| # belong to the preloaded parent process and must not be processed by every | ||
| # worker — otherwise uploads/scores would be duplicated across workers. | ||
| # | ||
| # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated | ||
| # here to keep this handler minimal; this mirrors the existing singleton client | ||
| # lifecycle. If preload-time network I/O is introduced in the future, clients | ||
| # may need fork-specific reinitialization as well. | ||
| self._init_consumer_threads() |
There was a problem hiding this comment.
Inherited
httpx.Client connection pool is not fork-safe
self.httpx_client and self._score_ingestion_client (which holds a reference to self.httpx_client as its session) are not recreated in _at_fork_reinit. After os.fork(), the child inherits duplicated file descriptors from the parent's connection pool. The parent and child share open TCP connections at the OS level; if the parent closes or recycles a connection, the child's pool may try to reuse a dead FD, causing connection errors on the first outbound requests from each worker. The existing comment acknowledges this but marks it as a future concern — since this PR is specifically targeting fork-safety, recreating the clients in the child would complete the fix.
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/resource_manager.py
Line: 366-374
Comment:
**Inherited `httpx.Client` connection pool is not fork-safe**
`self.httpx_client` and `self._score_ingestion_client` (which holds a reference to `self.httpx_client` as its `session`) are not recreated in `_at_fork_reinit`. After `os.fork()`, the child inherits duplicated file descriptors from the parent's connection pool. The parent and child share open TCP connections at the OS level; if the parent closes or recycles a connection, the child's pool may try to reuse a dead FD, causing connection errors on the first outbound requests from each worker. The existing comment acknowledges this but marks it as a future concern — since this PR is specifically targeting fork-safety, recreating the clients in the child would complete the fix.
How can I resolve this? If you propose a fix, please make it concise.…r reinitialization Signed-off-by: 박영규 <pyg410@naver.com>
What does this PR do?
When using Gunicorn with
--preload,os.fork()copies memory but not threads(POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
LangfuseResourceManagerstarts media upload and score ingestion consumer threadson initialization, but these threads are not inherited by forked worker processes.
As a result, all media upload and score ingestion events are silently lost, and
calling
flush()blocks forever onqueue.join()→ Gunicorn worker timeout (SIGABRT).Note: span export is already fork-safe via
LangfuseSpanProcessor(BatchSpanProcessorinheritance). This fix covers the remaining background threads managed by
LangfuseResourceManager.Related PR
Changes:
_init_consumer_threads()for reusescore_ingestion_clientas an instance variable for access in_at_fork_reinit()_at_fork_reinit()to reinitialize queues and consumer threads after forkos.register_at_fork(after_in_child=...)usingweakref.WeakMethodtoavoid permanent strong references that would prevent garbage collection
_shutdownflag to skip reinitialization on already-stopped instancesType of change
Verification
List the main commands you ran:
Checklist
code_review.md..env.templateif needed.Greptile Summary
This PR fixes silent data loss and
flush()deadlocks in Gunicorn--preloaddeployments by registering anos.register_at_fork(after_in_child=...)handler that reinitializes the media upload and score ingestion consumer threads in each forked worker._init_consumer_threads()so it can be called both during initial setup and after fork; queues are intentionally recreated in the child to prevent duplicating pre-fork events across workers.WeakMethod-based fork handler prevents theos.register_at_forkregistry from holding a strong reference that would block garbage collection, and a_shutdownflag guards against reinitializing already-torn-down instances.httpx.Client/LangfuseClientinstances after fork (acknowledged in comments), leaving inherited connection-pool FDs that could cause initial connection errors in workers; theafter_in_childcallback also has no exception handling, so a thread-creation failure propagates out ofos.fork()and crashes the worker rather than degrading gracefully.Confidence Score: 3/5
The core fix is sound and addresses a real production issue, but the
after_in_childcallback has no exception handling, meaning a thread-creation failure would crash the Gunicorn worker rather than falling back gracefully.The fork-reinitialization logic is correct for the happy path: queues are replaced, old dead thread references are discarded, new consumer threads are started, and the WeakMethod approach is appropriate. The main gap is that
_init_consumer_threads()is called directly inside theafter_in_childcallback without a try/except — if the OS refuses to create a thread at worker startup time, the exception surfaces fromos.fork()and kills the worker instead of allowing it to continue with degraded telemetry. The inherited httpx connection pool FDs are an acknowledged limitation that could surface as transient connection errors in workers.langfuse/_client/resource_manager.pywarrants a second look around the_at_fork_reinitmethod — specifically exception handling and whether the HTTP clients need to be recreated for full fork safety.Sequence Diagram
sequenceDiagram participant Master as Master Process (Gunicorn) participant Fork as os.fork() participant Parent as Parent Process participant Child as Child Worker Process Master->>Master: Langfuse() → LangfuseResourceManager.__new__() Master->>Master: _initialize_instance() Master->>Master: _init_consumer_threads() → start media + score threads Master->>Master: "os.register_at_fork(after_in_child=weak_reinit_lambda)" Master->>Fork: os.fork() Fork-->>Parent: returns child PID (old threads alive) Fork-->>Child: returns 0 (threads NOT inherited) Child->>Child: after_in_child: weak_reinit() → _at_fork_reinit() Child->>Child: _shutdown? → False → proceed Child->>Child: _init_consumer_threads() Child->>Child: new _media_upload_queue (fresh Queue) Child->>Child: new _score_ingestion_queue (fresh Queue) Child->>Child: start fresh MediaUploadConsumer threads Child->>Child: start fresh ScoreIngestionConsumer thread Child->>Child: child ready to handle requests Parent->>Parent: continues with original threads unchangedPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix(resource_manager): reinitialize cons..." | Re-trigger Greptile