HealDA IO Improvement: Parallel decode + Obstore#914
Conversation
Swap UFSObsConv/UFSObsSat per-file fetch from s3fs._cat_file to obstore get_range_async/get_async on a cached anonymous per-bucket S3Store. Removes the s3fs filesystem/session plumbing; preserves cache paths, byte-range semantics, and missing-file (404) handling. Adds obstore dependency (s3fs kept for other data sources). Speeds up the HealDA obs fetch (the ~90% data-path cost): cold obs fetch 49.4s -> 38s, end-to-end HealDA pipeline -22% on GB200 (cross-cloud to NOAA S3), identical analysis output.
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Decode each GSI file's HDF5->DataFrame across forked workers, on top of the obstore fetch. Worker count is chosen automatically: min(available_cpus, 16, n_files). Falls back to serial decode if a CUDA context already exists in the process (fork-after-CUDA is unsafe). No configuration knob; serial path is unchanged behavior when only one worker is selected. Decode ~5x faster; HealDA e2e drops further on top of the obstore fetch. Signed-off-by: Negin Sobhani <nsobhani@nvidia.com>
Greptile SummaryThis PR replaces
|
| Filename | Overview |
|---|---|
| earth2studio/data/ufs.py | Replaces s3fs with obstore for S3 I/O and adds fork-based parallel decode via ProcessPoolExecutor; has a macOS safety-guard bug and a thread-unsafe module-level context dict that could produce wrong results under concurrent use. |
| pyproject.toml | Adds obstore>=0.8 as a core dependency; s3fs is no longer imported in ufs.py but is not removed from dependencies — verify whether it's still needed elsewhere. |
Reviews (1): Last reviewed commit: "data/ufs: parallelize NetCDF decode acro..." | Re-trigger Greptile
| if workers > 1 and ( | ||
| "fork" not in multiprocessing.get_all_start_methods() | ||
| or _cuda_initialized() | ||
| ): | ||
| # Parallel decode requires the 'fork' start method (not available on | ||
| # Windows; unsafe on macOS). Also unsafe once CUDA is initialized. | ||
| # In either case fall back to serial decode. | ||
| workers = 1 |
There was a problem hiding this comment.
macOS safety guard does not cover macOS
The condition "fork" not in multiprocessing.get_all_start_methods() evaluates to False on macOS because fork is technically in the list of available methods — it just isn't the safe/default one. The code comment explicitly states "unsafe on macOS … fall back to serial decode," but the guard never triggers on macOS, and line 300 unconditionally creates the executor with get_context("fork"). On macOS this will proceed with fork-after-threads, risking deadlocks or corrupted state from Objective-C/Core Foundation globals inherited by workers. The fix is to also check sys.platform == "darwin" (or compare against multiprocessing.get_start_method(), which defaults to "spawn" on macOS) before allowing parallel decode.
| _DECODE_CTX["args"] = (self, chunks, variables, schema) | ||
| try: | ||
| with concurrent.futures.ProcessPoolExecutor( | ||
| max_workers=len(chunks), | ||
| mp_context=multiprocessing.get_context("fork"), | ||
| ) as executor: | ||
| parts = list(executor.map(_decode_chunk_idx, range(len(chunks)))) | ||
| finally: | ||
| _DECODE_CTX.clear() |
There was a problem hiding this comment.
Thread-unsafe module-level global
_DECODE_CTX
_DECODE_CTX["args"] is written on line 296 and the forked workers inherit it via copy-on-write. If two calls to _compile_dataframe are in-flight simultaneously — e.g., when _sync_async dispatches work across threads or when two separate instances are driven concurrently — Thread A's write can be overwritten by Thread B before A's workers are forked. Workers from Thread A would then process Thread B's tasks, silently producing wrong results or raising index/shape errors. Even with the GIL protecting individual dict operations, the three-step sequence (write global → fork workers → workers read global) is not atomic. Consider replacing the global with a multiprocessing.Manager shared value, threading locks around the critical section, or — simplest — passing the context as pickled init args to the pool initializer instead of relying on global inheritance.
| except (FileNotFoundError, obs.exceptions.NotFoundError): | ||
| self._handle_missing_file(path) | ||
| except Exception as err: | ||
| raise |
There was a problem hiding this comment.
The
except Exception as err: raise block does nothing — it catches the exception and immediately re-raises it with no logging, wrapping, or side-effects. Remove it so unhandled exceptions propagate naturally from the try block.
| except (FileNotFoundError, obs.exceptions.NotFoundError): | |
| self._handle_missing_file(path) | |
| except Exception as err: | |
| raise | |
| except (FileNotFoundError, obs.exceptions.NotFoundError): | |
| self._handle_missing_file(path) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Earth2Studio Pull Request
Description
This PR adds the second HealDA data-path: parallelizing the NetCDF→DataFrame decode of the UFS GSI observation files. After #913 , the remaining cost is the per-file HDF5→pandas decode, which is CPU- and GIL-bound and runs single-threaded.
UFSObsConv/UFSObsSatread hundreds ofdiag_*.nc4files per analysis; each is parsed (h5netcdf), char→string converted, channel-index expanded, and built into a DataFrame. This PR fans that per-file work across forked worker processes.Checklist
Dependencies