Fix/7387 storage executor fanout limit#7403
Conversation
Prevents storage_executor (96 workers) from being saturated when multiple users trigger large audio merges simultaneously. Caps concurrent chunk downloads at 20 and precache operations at 20. Fixes BasedHardware#7387 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Limits concurrent storage_executor submissions for audio precaching to 10 per request, preventing pool saturation during large syncs. Fixes BasedHardware#7387 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Limits concurrent memory processing in rebuild_knowledge_graph to 10, preventing storage_executor saturation when rebuilding large graphs. Fixes BasedHardware#7387 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Validates semaphore concurrency limiting, deadlock prevention, and AST-level presence of _STORAGE_FANOUT_SEMAPHORE in storage module. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
cc @beastoin |
Greptile SummaryThis PR addresses issue #7387 by adding
Confidence Score: 3/5The change partially addresses the storage executor fan-out problem but leaves the core multi-request scenario unprotected and introduces a thread-parking pattern that can quietly saturate the executor under concurrent load. The module-level semaphore in storage.py is correctly scoped, but the semaphores added in sync.py and knowledge_graph.py are created per-call, so concurrent requests share no global limit and can still flood storage_executor together. Submitting all futures before any semaphore is held means up to 96 threads can park on the semaphore simultaneously, holding worker slots idle and potentially starving other work on the same executor. backend/routers/sync.py and backend/utils/llm/knowledge_graph.py need their per-call semaphores replaced with a shared module-level semaphore, and the submission loop should gate on semaphore availability before enqueuing tasks. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[HTTP Request] --> B{sync.py handler}
B -->|per-request Semaphore 10 created| C[_precache_all_parallel submitted to postprocess_executor]
C --> D[Submit ALL N tasks to storage_executor at once]
D --> E{storage_executor 96 threads}
E -->|up to 10 acquire per-request sem| F[_precache_audio_file]
E -->|remaining threads block on per-request sem| G[Thread parked - holds worker slot]
F --> H{download_audio_chunks_and_merge}
H -->|bounded by module-level sem 20| I[_STORAGE_FANOUT_SEMAPHORE in storage.py]
I --> J[GCS Download]
K[2nd concurrent Request] -->|creates NEW independent Semaphore 10| D
L[knowledge_graph.py rebuild_knowledge_graph] -->|per-call Semaphore 10 created| D
style G fill:#f99,stroke:#c00
style K fill:#ffd,stroke:#aa0
style I fill:#9f9,stroke:#090
Reviews (1): Last reviewed commit: "chore: add test_storage_fanout_limit to ..." | Re-trigger Greptile |
| _precache_sem = threading.Semaphore(10) | ||
|
|
||
| def _precache_all_parallel(): | ||
| logger.info(f"Pre-caching all {len(audio_files)} audio files for conversation {conversation_id} (parallel)") | ||
| futures = [ | ||
| submit_with_context(storage_executor, _precache_audio_file, uid, conversation_id, af) for af in audio_files | ||
| ] | ||
|
|
||
| def _bounded_precache(af): | ||
| with _precache_sem: | ||
| return _precache_audio_file(uid, conversation_id, af) | ||
|
|
||
| futures = [submit_with_context(storage_executor, _bounded_precache, af) for af in audio_files] |
There was a problem hiding this comment.
Per-request semaphore provides no cross-request fan-out protection
_precache_sem is allocated fresh inside every HTTP handler invocation. Each concurrent request gets its own independent Semaphore(10), so N simultaneous requests each submitting M audio files contribute N×M tasks to storage_executor with no global ceiling. The same pattern applies to _uncached_sem at line 308. The _STORAGE_FANOUT_SEMAPHORE already defined at module level in storage.py is the correct shared throttle — it should be imported here (or mirrored at module level in sync.py) so that concurrency is bounded across all requests, not just within a single one.
| _kg_sem = threading.Semaphore(10) | ||
|
|
||
| def _bounded_process_memory(m): | ||
| with _kg_sem: | ||
| return process_memory(m) | ||
|
|
||
| futures = [storage_executor.submit(_bounded_process_memory, m) for m in memories] |
There was a problem hiding this comment.
Per-call semaphore and thread-parking anti-pattern
_kg_sem is created fresh on every call to rebuild_knowledge_graph, so concurrent callers each get their own Semaphore(10) with no shared global limit. Additionally, all futures are submitted to storage_executor (96 threads) before any semaphore is acquired. When len(memories) > 10, up to 96 threads start executing _bounded_process_memory, and 86+ of them immediately block on _kg_sem, occupying worker slots while doing no useful work. This can starve unrelated tasks queued on the same executor.
| _STORAGE_FANOUT_SEMAPHORE = threading.Semaphore(20) | ||
|
|
||
| import opuslib |
There was a problem hiding this comment.
Module-level object initialized between import blocks
_STORAGE_FANOUT_SEMAPHORE is instantiated between the internal-package imports (from utils.executors import ...) and the third-party library imports (import opuslib). Moving it after all imports are complete keeps the standard stdlib → third-party → local import ordering intact.
| Uses ast parsing to avoid importing heavy native deps (opuslib, GCS). | ||
| """ | ||
| import ast | ||
| import pathlib | ||
|
|
||
| storage_path = pathlib.Path(__file__).resolve().parents[2] / 'utils' / 'other' / 'storage.py' | ||
| tree = ast.parse(storage_path.read_text()) | ||
|
|
||
| semaphore_found = False | ||
| semaphore_value = None | ||
| for node in ast.walk(tree): | ||
| if isinstance(node, ast.Assign): | ||
| for target in node.targets: | ||
| if isinstance(target, ast.Name) and target.id == '_STORAGE_FANOUT_SEMAPHORE': | ||
| semaphore_found = True | ||
| if isinstance(node.value, ast.Call) and node.value.args: |
There was a problem hiding this comment.
Test validates a symbol that doesn't guard the main call sites
test_storage_module_has_semaphore confirms that _STORAGE_FANOUT_SEMAPHORE exists in storage.py and its value is in [5, 30]. However, the per-request semaphores introduced in sync.py (_precache_sem, _uncached_sem) and knowledge_graph.py (_kg_sem) are different objects created at call time, and no test verifies that those sites respect any global limit or share a semaphore. The suite therefore passes while the actual cross-request unbounded fan-out problem remains untested.
kodjima33
left a comment
There was a problem hiding this comment.
thanks — storage executor fanout cap looks right
|
@kodjima33 been following omi for some time, and created other pr's and one more pr that was merged earlier, would love to join the team kindly let me know |
No description provided.