Skip to content

Fix/7387 storage executor fanout limit#7403

Open
adithya-234 wants to merge 6 commits into
BasedHardware:mainfrom
adithya-234:fix/7387-storage-executor-fanout-limit
Open

Fix/7387 storage executor fanout limit#7403
adithya-234 wants to merge 6 commits into
BasedHardware:mainfrom
adithya-234:fix/7387-storage-executor-fanout-limit

Conversation

@adithya-234
Copy link
Copy Markdown
Contributor

No description provided.

adithya-234 and others added 5 commits May 20, 2026 18:22
Prevents storage_executor (96 workers) from being saturated when
multiple users trigger large audio merges simultaneously. Caps
concurrent chunk downloads at 20 and precache operations at 20.

Fixes BasedHardware#7387

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Limits concurrent storage_executor submissions for audio precaching
to 10 per request, preventing pool saturation during large syncs.

Fixes BasedHardware#7387

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Limits concurrent memory processing in rebuild_knowledge_graph to 10,
preventing storage_executor saturation when rebuilding large graphs.

Fixes BasedHardware#7387

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Validates semaphore concurrency limiting, deadlock prevention,
and AST-level presence of _STORAGE_FANOUT_SEMAPHORE in storage module.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@adithya-234
Copy link
Copy Markdown
Contributor Author

cc @beastoin

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 20, 2026

Greptile Summary

This PR addresses issue #7387 by adding threading.Semaphore guards around fan-out submissions to storage_executor, preventing unbounded queues of GCS operations. The approach in storage.py is correct (module-level _STORAGE_FANOUT_SEMAPHORE shared across all calls), but the semaphores added in sync.py and knowledge_graph.py are created fresh per-request/per-call, so multiple concurrent requests still produce unbounded combined fan-out.

  • storage.py correctly adds a module-level _STORAGE_FANOUT_SEMAPHORE(20) used inside download_audio_chunks_and_merge and precache_conversation_audio.
  • sync.py and knowledge_graph.py each create a new Semaphore(10) per invocation, which only limits concurrency within a single request — concurrent requests bypass each other's semaphores.
  • All three sites submit all futures upfront before any semaphore is acquired, so up to 96 storage_executor threads can park on the semaphore simultaneously, holding worker slots idle.

Confidence Score: 3/5

The change partially addresses the storage executor fan-out problem but leaves the core multi-request scenario unprotected and introduces a thread-parking pattern that can quietly saturate the executor under concurrent load.

The module-level semaphore in storage.py is correctly scoped, but the semaphores added in sync.py and knowledge_graph.py are created per-call, so concurrent requests share no global limit and can still flood storage_executor together. Submitting all futures before any semaphore is held means up to 96 threads can park on the semaphore simultaneously, holding worker slots idle and potentially starving other work on the same executor.

backend/routers/sync.py and backend/utils/llm/knowledge_graph.py need their per-call semaphores replaced with a shared module-level semaphore, and the submission loop should gate on semaphore availability before enqueuing tasks.

Important Files Changed

Filename Overview
backend/utils/other/storage.py Adds module-level _STORAGE_FANOUT_SEMAPHORE(20) correctly shared across all calls; semaphore object is placed between import groups (style issue).
backend/routers/sync.py Introduces per-request _precache_sem and _uncached_sem; these are local to each HTTP handler call and provide no cross-request concurrency bound.
backend/utils/llm/knowledge_graph.py Adds per-call _kg_sem(10) inside rebuild_knowledge_graph; all futures submitted upfront so up to 96 threads can park on the semaphore simultaneously.
backend/tests/unit/test_storage_fanout_limit.py New unit tests verify semaphore mechanics in isolation but do not cover the per-request semaphores in sync.py or knowledge_graph.py.
backend/test.sh Registers the new test file in the unit-test runner; no issues.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[HTTP Request] --> B{sync.py handler}
    B -->|per-request Semaphore 10 created| C[_precache_all_parallel submitted to postprocess_executor]
    C --> D[Submit ALL N tasks to storage_executor at once]
    D --> E{storage_executor 96 threads}
    E -->|up to 10 acquire per-request sem| F[_precache_audio_file]
    E -->|remaining threads block on per-request sem| G[Thread parked - holds worker slot]
    F --> H{download_audio_chunks_and_merge}
    H -->|bounded by module-level sem 20| I[_STORAGE_FANOUT_SEMAPHORE in storage.py]
    I --> J[GCS Download]
    K[2nd concurrent Request] -->|creates NEW independent Semaphore 10| D
    L[knowledge_graph.py rebuild_knowledge_graph] -->|per-call Semaphore 10 created| D
    style G fill:#f99,stroke:#c00
    style K fill:#ffd,stroke:#aa0
    style I fill:#9f9,stroke:#090
Loading

Reviews (1): Last reviewed commit: "chore: add test_storage_fanout_limit to ..." | Re-trigger Greptile

Comment thread backend/routers/sync.py
Comment on lines +205 to +214
_precache_sem = threading.Semaphore(10)

def _precache_all_parallel():
logger.info(f"Pre-caching all {len(audio_files)} audio files for conversation {conversation_id} (parallel)")
futures = [
submit_with_context(storage_executor, _precache_audio_file, uid, conversation_id, af) for af in audio_files
]

def _bounded_precache(af):
with _precache_sem:
return _precache_audio_file(uid, conversation_id, af)

futures = [submit_with_context(storage_executor, _bounded_precache, af) for af in audio_files]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Per-request semaphore provides no cross-request fan-out protection

_precache_sem is allocated fresh inside every HTTP handler invocation. Each concurrent request gets its own independent Semaphore(10), so N simultaneous requests each submitting M audio files contribute N×M tasks to storage_executor with no global ceiling. The same pattern applies to _uncached_sem at line 308. The _STORAGE_FANOUT_SEMAPHORE already defined at module level in storage.py is the correct shared throttle — it should be imported here (or mirrored at module level in sync.py) so that concurrency is bounded across all requests, not just within a single one.

Comment on lines +263 to +269
_kg_sem = threading.Semaphore(10)

def _bounded_process_memory(m):
with _kg_sem:
return process_memory(m)

futures = [storage_executor.submit(_bounded_process_memory, m) for m in memories]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Per-call semaphore and thread-parking anti-pattern

_kg_sem is created fresh on every call to rebuild_knowledge_graph, so concurrent callers each get their own Semaphore(10) with no shared global limit. Additionally, all futures are submitted to storage_executor (96 threads) before any semaphore is acquired. When len(memories) > 10, up to 96 threads start executing _bounded_process_memory, and 86+ of them immediately block on _kg_sem, occupying worker slots while doing no useful work. This can starve unrelated tasks queued on the same executor.

Comment on lines +13 to 15
_STORAGE_FANOUT_SEMAPHORE = threading.Semaphore(20)

import opuslib
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Module-level object initialized between import blocks

_STORAGE_FANOUT_SEMAPHORE is instantiated between the internal-package imports (from utils.executors import ...) and the third-party library imports (import opuslib). Moving it after all imports are complete keeps the standard stdlib → third-party → local import ordering intact.

Comment on lines +80 to +95
Uses ast parsing to avoid importing heavy native deps (opuslib, GCS).
"""
import ast
import pathlib

storage_path = pathlib.Path(__file__).resolve().parents[2] / 'utils' / 'other' / 'storage.py'
tree = ast.parse(storage_path.read_text())

semaphore_found = False
semaphore_value = None
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == '_STORAGE_FANOUT_SEMAPHORE':
semaphore_found = True
if isinstance(node.value, ast.Call) and node.value.args:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test validates a symbol that doesn't guard the main call sites

test_storage_module_has_semaphore confirms that _STORAGE_FANOUT_SEMAPHORE exists in storage.py and its value is in [5, 30]. However, the per-request semaphores introduced in sync.py (_precache_sem, _uncached_sem) and knowledge_graph.py (_kg_sem) are different objects created at call time, and no test verifies that those sites respect any global limit or share a semaphore. The suite therefore passes while the actual cross-request unbounded fan-out problem remains untested.

Copy link
Copy Markdown
Collaborator

@kodjima33 kodjima33 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks — storage executor fanout cap looks right

@adithya-234
Copy link
Copy Markdown
Contributor Author

@kodjima33 been following omi for some time, and created other pr's and one more pr that was merged earlier, would love to join the team kindly let me know

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants