diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index a0b1fa8..b7dea97 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -1,113 +1,78 @@ -# gwtransport +# python-dawaco-tools -Scientific Python package for timeseries analysis of groundwater transport of solutes and heat. +Python tools for reading and analysing DAWACO groundwater monitoring data. -## Commands +## Repository layout -```bash -# Setup (fresh environment, separate from user's .venv) -# Windows: replace `env VAR=val cmd` with `set "VAR=val" && cmd` -rm -rf .venv-claude -env UV_PROJECT_ENVIRONMENT=.venv-claude uv sync --all-extras -q -git config core.hooksPath .githooks # Enable pre-commit hook +- `dawacotools\` -- importable package code. +- `tests\` -- pytest suite, including the synthetic SQLite DAWACO database builder in `tests\mock_dawaco.py`. +- `workflows\` -- project-specific scripts and notebooks/workflows. Treat these as data workflows, not package tests. +- `.github\workflows\` -- CI for Windows functional tests, linting, type checking, metadata validation, and Prettier. +- `tutorials\` -- user-facing examples. -# Testing (run before committing) -env UV_PROJECT_ENVIRONMENT=.venv-claude uv run -q pytest tests/src -n auto # Unit tests -env UV_PROJECT_ENVIRONMENT=.venv-claude uv run -q pytest tests/examples -n auto # Example notebooks -env UV_PROJECT_ENVIRONMENT=.venv-claude uv run -q pytest tests/docs -n auto # Documentation code snippets +## Development commands -# Linting (run before committing) -env UV_PROJECT_ENVIRONMENT=.venv-claude uv run -q ruff format . # Format code -env UV_PROJECT_ENVIRONMENT=.venv-claude uv run -q ruff check --fix . # Lint and auto-fix -npx prettier --check "**/*.{yaml,yml,md}" # Format markdown/yaml +Use a task-local uv environment rather than a user's active virtual environment: -# Type checking (run before committing) -uv tool update -q ty & uv tool run -q ty check . - -# Documentation -uv tool run -q --from sphinx --with-editable ".[docs]" sphinx-build -j auto -b linkcheck docs/source docs/build/linkcheck -rm -rf docs/build && uv tool run -q --from sphinx --with-editable ".[docs]" sphinx-build -j 1 -b html docs/source docs/build/html +```powershell +$env:UV_PROJECT_ENVIRONMENT = ".venv-claude" +uv sync --extra test -q ``` -## CI/CD - -All checks must pass before merging. Pipeline tests on Python 3.12 (minimum deps) and 3.14 (latest deps). See `.github/workflows/` for details. - -## Project Layout - -- `src/gwtransport/` -- Package source code -- `tests/src/` -- Unit tests (one test file per module) -- `tests/examples/` -- Jupyter notebook execution tests -- `tests/docs/` -- Documentation code snippet tests -- `examples/` -- Example Jupyter notebooks -- `docs/source/` -- Sphinx documentation source - -## Philosophy - -You are a quality gatekeeper, not just an implementer. Before writing code: +Run the default CI-safe tests against the synthetic DAWACO database: -- **Understand the physics.** This is a scientific package -- correctness of physical equations, units, and boundary conditions matters more than code elegance. If unsure about the physics, ask. -- **Check for dead code.** After every change, verify no unused imports, functions, or variables remain. Remove them. -- **Keep API and docs consistent.** When changing a public function signature, update its docstring, any cross-references (see `docs/CROSS_REFERENCES.md`), and affected example notebooks. -- **Re-read the request.** Before finishing, re-read the original question to verify you actually answered it. - -## Code Style +```powershell +uv run pytest tests +``` -- **Docstrings**: NumPy style. See example below. -- **Line length**: 120 characters. -- **Type hints**: Required for all public functions. Use `npt.ArrayLike` for array inputs, `npt.NDArray[np.floating]` for array outputs, `pd.DatetimeIndex` for time edges. Use built-in Python generics (`list`, `tuple`, `dict`, `X | None`) -- NEVER import from `typing`. -- **Vectorization**: ALWAYS prefer vectorized NumPy/SciPy/pandas operations over Python for-loops. If you find yourself writing a loop over array elements, stop and find the vectorized equivalent. -- **Formatting**: Enforced by linting with ruff and prettier. Do not fight the formatter. -- **Parameter names**: Use descriptive names consistent with domain conventions. For example, use `flow` for flow rate, `cin`/`cout` for concentrations, `tedges` for time edges, `xedges` for spatial edges. +Run the configured Python checks: -```python -def function_name(*, flow: npt.ArrayLike, tedges: pd.DatetimeIndex) -> npt.NDArray[np.floating]: - """Short description. +```powershell +uv run ruff format --diff dawacotools tests +uv run ruff check dawacotools tests +uv run ty check dawacotools tests --ignore unused-ignore-comment +uv run validate-pyproject pyproject.toml +``` - Parameters - ---------- - flow : array-like - Flow rate (m³/day). - tedges : DatetimeIndex - Time bin edges (n+1 edges for n values). +Ruff and ty are scoped to the package and tests. Package code has no package-wide ruff baseline; only tests keep test-specific ignores for docstrings, magic constants, and asserts. - Returns - ------- - ndarray - Result description. +Markdown and YAML formatting is checked with a pinned Prettier version through `npx`; Node.js/npm must be +available: - See Also - -------- - related_function : Brief description. - :ref:`concept-residence-time` : Background on the concept. - """ +```powershell +npx --yes prettier@3.8.3 --check "**/*.{yaml,yml,md}" ``` -## Domain Conventions +CI also verifies the lowest direct test dependencies on Python 3.12: -**IMPORTANT**: These conventions are load-bearing for correctness. - -- **Bin-edge pattern**: Time is represented as `tedges` (`pd.DatetimeIndex`, n+1 edges) with n values constant over each interval `[tedges[i], tedges[i+1])`. Same pattern for spatial dimension (`xedges`). -- **Input/output semantics**: Input values (`flow`, `cin` for infiltration-to-extraction; `flow`, `cout` for extraction-to-infiltration) are constant per bin. Output concentration/temperature is a flow-weighted bin average. -- **Paired operations**: Functions come in forward (infiltration-to-extraction) and reverse (extraction-to-infiltration) variants. -- **Multiple parameterizations**: Gamma distributions support both (alpha, beta, loc) and (mean, std, loc). -- **Retardation**: All transport functions account for sorption via retardation factors. -- **Units**: Must be consistent within a calculation. The package does not enforce units -- the user is responsible. +```powershell +uv sync --extra test --resolution lowest-direct --python 3.12 +uv run pytest tests -n0 +``` -## Testing +## DAWACO database handling -- Use fixtures from `tests/src/conftest.py` for common test data. -- Tests MUST be exact to machine precision. Use `np.testing.assert_allclose(actual, expected)`. -- Validate physical correctness: conservation laws, boundary conditions, limiting cases. -- Tests MUST be meaningful -- not trivial identity checks. Use analytical solutions for validation when possible. -- Run specific tests with: `uv run pytest tests/src/test_diffusion.py -v` or `uv run pytest tests/src -k "advection" -v` -- LLM reviewers (including this one) routinely flag plausible-sounding bugs that do not actually exist; before applying any non-trivial suggested change, write a regression test for the claimed bug and run it against the unchanged baseline -- only proceed if the baseline test fails. +- Default tests must use the fully synthetic SQLite database created by `tests\mock_dawaco.py`. +- Synthetic test data may be committed only when it is fabricated and safe for public CI. +- Private mock databases may be used only through `DAWACOTOOLS_PRIVATE_DATABASE_URL` or + `DAWACOTOOLS_DATABASE_URL` plus `uv run pytest tests --run-private-db -m private_db -n0`. +- Live DAWACO smoke tests require explicit opt-in with `--run-live-db` or `DAWACOTOOLS_RUN_LIVE_DB=1` and local + credentials/environment such as `DAWACOTOOLS_LIVE_MPCODE` and `DAWACOTOOLS_LIVE_FILTER`. +- Never run private or live database tests in CI unless a workflow is explicitly designed for protected secrets. -## Git +## Data safety -- Do NOT include Claude-related and copilot-related signatures in commit messages or PR descriptions. Nor use any emoticons. Keep messages professional and focused on the technical content. -- Run formatting, linting, type checking, and ask all reviewers for approval before committing. +- Do not commit production DAWACO exports, private mock databases, connection strings, credentials, tokens, or + identifiable monitoring values copied from production. +- Keep generated `.db`, `.sqlite`, `.sqlite3`, `.sql`, `.gpkg`, `.geojson`, `.feather`, `.parquet`, images, and + archives out of git; `.gitignore` is configured for these data/export formats. +- Tests for private data must assert schema, shape, and data-quality invariants only; do not encode real DAWACO + values in tests, docs, or examples. +- Be careful with `workflows\` scripts: many are local/project analyses and may read or produce private data. -## Cross-References +## Coding guidance -See `docs/CROSS_REFERENCES.md` for available Sphinx labels (concepts, assumptions, examples) and syntax for linking from docstrings vs notebooks. +- Keep changes scoped. Do not perform broad package lint, type, or API cleanup unless it is required for the task. +- Prefer small, well-named tests in `tests\` for package behavior changes. +- Public APIs should have type hints and clear docstrings, but avoid reshaping unrelated legacy code. +- Re-read the request before finishing and run the relevant configured checks. diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 95a4474..f79e1ee 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -26,17 +26,17 @@ jobs: uv run ruff --version uv run ruff format --diff dawacotools tests - name: Run ruff - run: uv run ruff check tests dawacotools\__init__.py + run: uv run ruff check dawacotools tests - name: Type checking run: | uv run ty --version - uv run ty check tests --output-format github --ignore unused-ignore-comment + uv run ty check dawacotools tests --output-format github --ignore unused-ignore-comment - name: Validate project metadata run: uv run validate-pyproject pyproject.toml - name: Prettier - run: npx prettier --check "**/*.{yaml,yml,md}" + run: npx --yes prettier@3.8.3 --check "**/*.{yaml,yml,md}" - name: Prettier show diff if: ${{ failure() }} run: | - npx prettier --write "**/*.{yaml,yml,md}" + npx --yes prettier@3.8.3 --write "**/*.{yaml,yml,md}" git diff diff --git a/README.md b/README.md index be9c9f9..04ccc07 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,17 @@ $env:UV_PROJECT_ENVIRONMENT = ".venv-claude" uv sync --extra test ``` +Markdown/YAML formatting uses Prettier through `npx`; install Node.js/npm before running the Prettier command below. +The pinned Prettier version keeps local checks aligned with CI. + Run the CI-safe test suite against the synthetic SQLite DAWACO database: ```powershell uv run pytest tests ``` +The CI workflow does not execute tutorial notebooks; validate tutorial changes manually. + Run a single test: ```powershell @@ -54,12 +59,14 @@ Run linting and formatting checks: ```powershell uv run ruff format --diff dawacotools tests -uv run ruff check tests dawacotools\__init__.py -uv run ty check tests +uv run ruff check dawacotools tests +uv run ty check dawacotools tests --ignore unused-ignore-comment uv run validate-pyproject pyproject.toml -npx prettier --check "**/*.{yaml,yml,md}" +npx --yes prettier@3.8.3 --check "**/*.{yaml,yml,md}" ``` +Ruff and ty run over the package and tests. Package code has no package-wide ruff baseline; only tests keep test-specific ignores for docstrings, magic constants, and asserts. + The default tests build a fully synthetic SQLite database in pytest's temporary directory. These rows are fabricated and safe for CI. Do not commit exports or samples from the production DAWACO database. Local database and geospatial export files are ignored so private mock databases generated from production data stay out of git. To create the same fabricated CI database as a persistent local SQLite file: diff --git a/dawacotools/analysis.py b/dawacotools/analysis.py index 5df49fc..fdb93d4 100644 --- a/dawacotools/analysis.py +++ b/dawacotools/analysis.py @@ -16,7 +16,7 @@ ) -def remove_outliers(data, threshold=3.0): +def remove_outliers(data: pd.Series, threshold: float = 3.0) -> pd.Series: """Remove outliers from a pandas.Series. Outliers are defined as values that are more than threshold standard deviations away from the mean. @@ -38,52 +38,95 @@ def remove_outliers(data, threshold=3.0): def compute_residence_time( - flow, - pore_volume_reservoir=None, - average_residence_time=None, - extraction_infiltration="extraction", - retardation_factor=1.0, -): + flow: pd.Series, + pore_volume_reservoir: float | None = None, + average_residence_time: float | None = None, + extraction_infiltration: str = "extraction", + retardation_factor: float = 1.0, +) -> pd.Series: """Compute the residence time of the water extracted from a plug-flow reservoir. - The residence time is computed from the historic flows through a reservoir of a given volume. Either - provide the pore_volume_reservoir or the average_residence_time parameters. + The residence time is computed from historic flow rates through a reservoir of a given volume. The + cumulative transported volume is derived from elapsed days in ``flow.index``; therefore ``flow`` values + must be volumetric rates per day (for example m³/day). Convert rates in other time units before calling + this function, e.g. multiply m³/hour by 24. IKIEF 9100: >> pore_volume_reservoir = 2 * lengte_strang * afstand_pand_put / porositeit >> 128571 m3 = 2 * 500 * 45 / 0.35 - Given a certain volume of the reservoir, the residence time - Parameters ---------- flow : pandas.Series - The flow in the reservoir in m3/h. + Volumetric flow rate through the reservoir in m³/day. The index must be a strictly increasing + pandas.DatetimeIndex; flow is linearly interpolated between timestamps and integrated over elapsed days. pore_volume_reservoir : float, optional - The pore volume of the reservoir in m3. If not given, it is computed as the product of the porosity and the volume of the reservoir. + The pore volume of the reservoir in the same volume unit as ``flow``. Provide either this value or + ``average_residence_time``. average_residence_time : float, optional - The average residence time in days. If no pore_volume_reservoir is given, the pore volume is computed as the product of the average residence time and the mean flow. + The average residence time in days. If no ``pore_volume_reservoir`` is given, the pore volume is computed + as the product of the average residence time and the time-weighted mean flow. extraction_infiltration : str, optional - The type of flow. Either 'extraction' or 'infiltration'. Extraction refers to backward modeling: how many days ago did this extracted water infiltrate. Infiltration refers to forward modeling: how many days will it take for this infiltrated water to be extracted. + The type of flow. Either 'extraction' or 'infiltration'. Extraction refers to backward modeling: how many + days ago did this extracted water infiltrate. Infiltration refers to forward modeling: how many days will it + take for this infiltrated water to be extracted. Default is 'extraction'. + retardation_factor : float, optional + Multiplicative factor for solute retardation relative to water movement. Default is 1. Returns ------- pandas.Series The residence time in days. """ - if pore_volume_reservoir is None and average_residence_time is not None: - pore_volume_reservoir = average_residence_time * flow.mean() + if pore_volume_reservoir is None and average_residence_time is None: + msg = "Provide either pore_volume_reservoir or average_residence_time" + raise ValueError(msg) + + if not isinstance(flow.index, pd.DatetimeIndex): + msg = "flow must be indexed by a pandas.DatetimeIndex" + raise TypeError(msg) - cum_flow_val = integrate.cumulative_simpson(y=flow.values, dx=1, initial=0.0) # m3 - interp_cum_flow_nu = interpolate.interp1d(cum_flow_val, flow.index, fill_value="extrapolate") + minimum_timestamps = 2 + if len(flow.index) < minimum_timestamps: + msg = "flow must contain at least two timestamps" + raise ValueError(msg) + + elapsed_days = ((flow.index - flow.index[0]) / pd.to_timedelta(1, unit="D")).to_numpy(dtype=float) + if not np.all(np.diff(elapsed_days) > 0.0): + msg = "flow.index must be strictly increasing" + raise ValueError(msg) + + flow_values = flow.to_numpy(dtype=float) + cum_flow_val = integrate.cumulative_trapezoid(y=flow_values, x=elapsed_days, initial=0.0) + + if not np.all(np.diff(cum_flow_val) > 0.0): + msg = "Cumulative flow must be strictly increasing; use positive flow rates through the reservoir" + raise ValueError(msg) + + if pore_volume_reservoir is None: + if average_residence_time is None: + msg = "Provide either pore_volume_reservoir or average_residence_time" + raise ValueError(msg) + mean_flow = cum_flow_val[-1] / elapsed_days[-1] + pore_volume = average_residence_time * mean_flow + else: + pore_volume = pore_volume_reservoir + + transport_volume = pore_volume * retardation_factor + elapsed_at_cumulative_flow = interpolate.interp1d( + cum_flow_val, + elapsed_days, + fill_value="extrapolate", + assume_sorted=True, + ) if extraction_infiltration == "extraction": - toen = pd.to_datetime(interp_cum_flow_nu(cum_flow_val - pore_volume_reservoir * retardation_factor)) - residence_time = (flow.index - toen) / pd.to_timedelta(1, unit="D") + infiltration_time = elapsed_at_cumulative_flow(cum_flow_val - transport_volume) + residence_time = elapsed_days - infiltration_time elif extraction_infiltration == "infiltration": - dan = pd.to_datetime(interp_cum_flow_nu(cum_flow_val + pore_volume_reservoir * retardation_factor)) - residence_time = (dan - flow.index) / pd.to_timedelta(1, unit="D") + extraction_time = elapsed_at_cumulative_flow(cum_flow_val + transport_volume) + residence_time = extraction_time - elapsed_days else: msg = "extraction_infiltration should be 'extraction' or 'infiltration'" raise ValueError(msg) @@ -200,8 +243,6 @@ def get_well_info(mpcode=None, filternr=None): """Filter and MP data""" out["filter_metadata"] = get_daw_filters( mpcode=mpcode, - mv=True, - betrouwbaarheid=False, filternr=filternr, partial_match_mpcode=True, ) @@ -209,9 +250,10 @@ def get_well_info(mpcode=None, filternr=None): msg = "Better specify mpcode and filter number" raise ValueError(msg) - mpcode = out["filter_metadata"].iloc[0].FiltMpCode - filternr = out["filter_metadata"].iloc[0].Filtnr - x, y = out["filter_metadata"].iloc[0].Xcoor, out["filter_metadata"].iloc[0].Ycoor + filter_metadata = out["filter_metadata"].iloc[0] + mpcode = filter_metadata["MpCode"] + filternr = filter_metadata["Filtnr"] + x, y = filter_metadata["Xcoor"], filter_metadata["Ycoor"] """Groundwater levels""" out["gws"] = get_daw_ts_stijghgt(mpcode=mpcode, filternr=filternr) @@ -266,7 +308,7 @@ def get_cluster_mps(mps, r_max=1.5, min_cluster_size=2): # Find all points within r_max i, j = np.where(dist < r_max) collections = {i: [] for i in range(len(x))} - for ii, jj in zip(i, j): + for ii, jj in zip(i, j, strict=False): collections[ii].append(jj) # get unique collections larger than 1 diff --git a/dawacotools/colors.py b/dawacotools/colors.py index 88cb070..cca0028 100644 --- a/dawacotools/colors.py +++ b/dawacotools/colors.py @@ -1,6 +1,20 @@ """Color schemes and legends for geological and hydrological data visualization.""" -boorlegenda_dawaco = { +from typing import TypedDict + +type RgbColor = tuple[int, int, int] + + +class BoorLegendEntry(TypedDict): + """Color, hatch, and line-width settings for DAWACO boring components.""" + + hatch: dict[int, str] + lw: dict[int, float] + fc: dict[int, RgbColor] + idefault: int + + +boorlegenda_dawaco: dict[str, BoorLegendEntry] = { "Z": { # Zand "hatch": {3: ".."}, "lw": {1: 0.5, 2: 0.8, 3: 1.1, 4: 1.4, 5: 2, 6: 2.8}, @@ -52,7 +66,7 @@ }, } -regis_colors = { +regis_colors: dict[str, RgbColor] = { "NUHLc": (0, 128, 0), "NUBXz1": (255, 255, 175), "NUBXSCk1": (0, 190, 0), @@ -187,7 +201,7 @@ "AKc": (0, 128, 0), } -tno_colors = { +tno_colors: dict[str, RgbColor] = { "NUHLc": (12, 129, 12), "NUBXz1": (255, 235, 0), "NUBXSCk1": (215, 175, 0), diff --git a/dawacotools/io.py b/dawacotools/io.py index 7925d82..2109449 100644 --- a/dawacotools/io.py +++ b/dawacotools/io.py @@ -2,15 +2,18 @@ from __future__ import annotations +import datetime as dt import os -from collections.abc import Iterable +import re +import warnings +from collections.abc import Iterable, Sequence -import geopandas as gpd +import geopandas import hydropandas as hpd import numpy as np import pandas as pd import xarray as xr -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text from sqlalchemy.engine import URL, Engine DEFAULT_DRIVER = "ODBC Driver 18 for SQL Server" @@ -23,6 +26,12 @@ CONNECTION_STRING_ENV_VAR = "DAWACOTOOLS_CONNECTION_STRING" AUTHENTICATION_ENV_VAR = "DAWACOTOOLS_ODBC_AUTHENTICATION" SCHEMA_ENV_VAR = "DAWACOTOOLS_DB_SCHEMA" +MIN_TRIWACO_LAYER_THICKNESS_M = 0.01 +METEO_MISSING_VALUE = -99.0 +MIN_VALID_HEAD_NAP = -60.0 +MIN_VALID_TEMPERATURE_C = -60.0 +MIN_SERIES_LENGTH_FOR_GAP_FILL = 2 +_VALID_SQL_IDENTIFIER = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)?$") def create_connection_string(authentication: str | None = None) -> str: @@ -60,7 +69,7 @@ def get_connection_string() -> str: connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string}) -def create_dawaco_engine(database_url: str | None = None, echo: bool = False) -> Engine: +def create_dawaco_engine(database_url: str | None = None, *, echo: bool = False) -> Engine: """Create a SQLAlchemy engine for DAWACO or a configured test database.""" if database_url is None: database_url = os.environ.get(DATABASE_URL_ENV_VAR) @@ -74,37 +83,104 @@ def create_dawaco_engine(database_url: str | None = None, echo: bool = False) -> def get_engine() -> Engine: """Return the configured SQLAlchemy engine, creating the DAWACO engine lazily.""" - global engine + current_engine = engine + if current_engine is None: + current_engine = create_dawaco_engine() + globals()["engine"] = current_engine - if engine is None: - engine = create_dawaco_engine() - - return engine + return current_engine def set_database_engine(database_engine: Engine | None, schema: str | None = None) -> None: """Set the SQLAlchemy engine used by DAWACO readers.""" - global dbname, engine - - engine = database_engine + globals()["engine"] = database_engine if schema is not None: - dbname = schema + globals()["dbname"] = schema def reset_database_engine(schema: str | None = None) -> None: """Reset DAWACO readers to the default lazily-created production engine.""" - global dbname, engine + globals()["engine"] = None + globals()["dbname"] = schema if schema is not None else os.environ.get(SCHEMA_ENV_VAR, DEFAULT_SCHEMA) + - engine = None - dbname = schema if schema is not None else os.environ.get(SCHEMA_ENV_VAR, DEFAULT_SCHEMA) +def _validate_sql_identifier(identifier: str) -> str: + if not _VALID_SQL_IDENTIFIER.fullmatch(identifier): + msg = f"Unsafe SQL identifier: {identifier!r}" + raise ValueError(msg) + + return identifier def _table(name: str) -> str: - return f"{dbname}.{name}" if dbname else name + table_name = _validate_sql_identifier(name) + schema = _validate_sql_identifier(dbname) if dbname else "" + return f"{schema}.{table_name}" if schema else table_name + + +def _sql(query: str, **identifiers: str) -> str: + return query.format_map({key: _validate_sql_identifier(value) for key, value in identifiers.items()}) + + +def _read_sql_query(query: str, params: dict[str, object] | None = None, **kwargs) -> pd.DataFrame: + return pd.read_sql_query(text(query), get_engine(), params=params, **kwargs) + + +def _sql_in_clause(column_name: str, values: Sequence[object], parameter_prefix: str) -> tuple[str, dict[str, object]]: + if len(values) == 0: + return "1 = 0", {} + + column_name = _validate_sql_identifier(column_name) + params = {f"{parameter_prefix}_{i}": value for i, value in enumerate(values)} + placeholders = ", ".join(f":{name}" for name in params) + return f"{column_name} IN ({placeholders})", params + + +def _normalise_mpcodes(mpcode) -> list[str]: + if isinstance(mpcode, str): + return [mpcode] + + if isinstance(mpcode, Iterable): + return [str(code) for code in mpcode] + + msg = "Unsupported mpcode type" + raise ValueError(msg) + + +def _matching_mpcodes(mpcode, *, partial_match_mpcode: bool) -> list[str]: + mpcodes = _normalise_mpcodes(mpcode) + if not partial_match_mpcode: + return mpcodes + available_mpcodes = ( + _read_sql_query(_sql("SELECT MpCode FROM {mp}", mp=_table("mp")))["MpCode"].astype(str).to_numpy() + ) + return [ + available_mpcode + for available_mpcode in available_mpcodes + if any(mpcode in available_mpcode for mpcode in mpcodes) + ] + + +def _normalise_filternr(filternr) -> int: + try: + filternr_float = float(filternr) + except (TypeError, ValueError): + msg = "filternr must be a non-negative integer-like value" + raise ValueError(msg) from None + + if not np.isfinite(filternr_float) or not filternr_float.is_integer() or filternr_float < 0: + msg = "filternr must be a non-negative integer-like value" + raise ValueError(msg) + + return int(filternr_float) -def _read_sql_query(query: str, **kwargs) -> pd.DataFrame: - return pd.read_sql_query(query, get_engine(), **kwargs) + +def _normalise_filternrs(filternr) -> list[int]: + if isinstance(filternr, str) or not isinstance(filternr, Iterable): + return [_normalise_filternr(filternr)] + + return [_normalise_filternr(value) for value in filternr] meteo_header = [ @@ -163,6 +239,23 @@ def _read_sql_query(query: str, **kwargs) -> pd.DataFrame: } +def _meteo_station_frame() -> pd.DataFrame: + return pd.DataFrame(meteo_arr, columns=pd.Index(meteo_header)) + + +def _required_timestamp(value: object, name: str) -> pd.Timestamp: + if not isinstance(value, str | dt.date | dt.datetime | np.datetime64 | pd.Timestamp): + msg = f"{name} must be a valid timestamp" + raise TypeError(msg) + + timestamp = pd.Timestamp(value) + if isinstance(timestamp, pd.Timestamp): + return timestamp + + msg = f"{name} must be a valid timestamp" + raise ValueError(msg) + + def df2gdf(df): """ Convert DataFrame with coordinate columns to GeoDataFrame. @@ -179,24 +272,26 @@ def df2gdf(df): """ df = df.loc[:, ~df.columns.duplicated()].copy() if "Xcoor" in df.columns: - geom = gpd.points_from_xy(df.Xcoor, df.Ycoor, crs="EPSG:28992") + geom = geopandas.points_from_xy(df.Xcoor, df.Ycoor, crs="EPSG:28992") else: - geom = gpd.points_from_xy(df.x, df.y, crs="EPSG:28992") + geom = geopandas.points_from_xy(df.x, df.y, crs="EPSG:28992") - return gpd.GeoDataFrame(df, geometry=geom, crs="EPSG:28992") + return geopandas.GeoDataFrame(df, geometry=geom, crs="EPSG:28992") -def get_daw_mps(mpcode=None, partial_match_mpcode=True): +def get_daw_mps(mpcode=None, *, partial_match_mpcode=True): """Inclusief vervallen! Retreive metadata of all wells. Takes 5 seconds.""" - q = f"SELECT * FROM {_table('mp')} " + q = _sql("SELECT * FROM {mp} ", mp=_table("mp")) - q += fuzzy_match_mpcode( + match_mp_str, params = fuzzy_match_mpcode( mpcode=mpcode, partial_match_mpcode=partial_match_mpcode, mpcode_sql_name="MpCode", + return_params=True, ) + q += match_mp_str - b = _read_sql_query(q, dtype={"Soort": int}) + b = _read_sql_query(q, params=params, dtype={"Soort": int}) b.set_index("MpCode", inplace=True) get_daw_soort_mp(b) @@ -207,22 +302,26 @@ def get_daw_mon_dates(mpcode=None, filternr=None): """Retreive unique water quality sampling dates of all mpcode and filternr provided.""" gwkmon_table = _table("gwkmon") filters_table = _table("filters") - q = ( - f"select datum from {gwkmon_table} as gwkmon " # for debug use * instead of Datum - f"inner join {filters_table} as filters on gwkmon.filtrec = filters.recnum " + q = _sql( + "select datum from {gwkmon} as gwkmon " # for debug use * instead of Datum + "inner join {filters} as filters on gwkmon.filtrec = filters.recnum ", + gwkmon=gwkmon_table, + filters=filters_table, ) - q += fuzzy_match_mpcode( + match_mp_str, params = fuzzy_match_mpcode( mpcode=mpcode, filternr=filternr, partial_match_mpcode=False, mpcode_sql_name="MpCode", filternr_sql_name="filtnr", + return_params=True, ) + q += match_mp_str q += "ORDER BY datum " - b = _read_sql_query(q) + b = _read_sql_query(q, params=params) return pd.to_datetime(b.datum.unique(), format="%Y-%m-%d", errors="coerce") @@ -230,86 +329,74 @@ def get_daw_mon_dates(mpcode=None, filternr=None): def fuzzy_match_mpcode( mpcode=None, filternr=None, + *, partial_match_mpcode=True, mpcode_sql_name="MpCode", filternr_sql_name="filternr", + return_params=False, ): - if mpcode is None: - return "" - - if not partial_match_mpcode and isinstance(mpcode, str): - q = f"WHERE {mpcode_sql_name}='{mpcode}' " - - elif not partial_match_mpcode and isinstance(mpcode, Iterable): - mp_code_Str = "', '".join(mpcode) - q = f"WHERE {mpcode_sql_name} in ('{mp_code_Str}') " - - elif partial_match_mpcode and isinstance(mpcode, str): - mps = _read_sql_query(f"SELECT MpCode FROM {_table('mp')}").values[:, 0] - mpcode_match = mps[[mpcode in s for s in mps]] - mp_code_Str = "', '".join(mpcode_match) - q = f"WHERE {mpcode_sql_name} in ('{mp_code_Str}') " - - elif partial_match_mpcode and isinstance(mpcode, Iterable): - mps = _read_sql_query(f"SELECT MpCode FROM {_table('mp')}").values[:, 0] - mpcode_match = mps[[any(ss in s for ss in mpcode) for s in mps]] - mp_code_Str = "', '".join(mpcode_match) - q = f"WHERE {mpcode_sql_name} in ('{mp_code_Str}') " + """Return a parameterized SQL filter clause for mpcode and filternr selections.""" + conditions = [] + params = {} - else: - assert mpcode is None, "Unsupported mpcode type" - - if filternr is None: - return q - - if isinstance(filternr, str): - filternr_str = filternr - - elif isinstance(filternr, (float, int)): - assert filternr >= 0, "Pumping well is zero and observations wells start counting at 1" - - filternr_str = str(int(filternr)) - - elif isinstance(filternr, list): - for i in filternr: - assert int(i) > 0, "filternr is one-based" - - filternr_str = "', '".join([str(int(i)) for i in filternr]) + if mpcode is not None: + mpcode_clause, mpcode_params = _sql_in_clause( + mpcode_sql_name, + _matching_mpcodes(mpcode, partial_match_mpcode=partial_match_mpcode), + "mpcode", + ) + conditions.append(mpcode_clause) + params.update(mpcode_params) + + if filternr is not None: + filternr_clause, filternr_params = _sql_in_clause( + filternr_sql_name, + _normalise_filternrs(filternr), + "filternr", + ) + conditions.append(filternr_clause) + params.update(filternr_params) - else: - assert filternr is None, "Unsupported filternr type" + query = "" if len(conditions) == 0 else "WHERE " + " AND ".join(conditions) + " " - q += f"AND {filternr_sql_name} in ('{filternr_str}') " + if return_params: + return query, params - return q + return query def get_daw_filters( mpcode=None, filternr=None, + *, partial_match_mpcode=True, vervallen_filters_meenemen=False, return_hpd=False, ): """Retreive metadata of all filters. Takes 25 seconds.""" - match_mp_str = fuzzy_match_mpcode( + match_mp_str, match_mp_params = fuzzy_match_mpcode( mpcode=mpcode, filternr=None, partial_match_mpcode=partial_match_mpcode, mpcode_sql_name="MpCode", filternr_sql_name="Filtnr", + return_params=True, + ) + mps = _read_sql_query(_sql("SELECT * from {mp} ", mp=_table("mp")) + match_mp_str, params=match_mp_params).drop( + columns=["RECNUM"] ) - mps = _read_sql_query(f"SELECT * from {_table('mp')} {match_mp_str}").drop(columns=["RECNUM"]) - match_filt_str = fuzzy_match_mpcode( + match_filt_str, match_filt_params = fuzzy_match_mpcode( mpcode=mpcode, filternr=filternr, partial_match_mpcode=partial_match_mpcode, mpcode_sql_name="MpCode", filternr_sql_name="Filtnr", + return_params=True, ) filters = _read_sql_query( - f"SELECT * from {_table('filters')} {match_filt_str}", + _sql("SELECT * from {filters} ", filters=_table("filters")) + match_filt_str, + params=match_filt_params, dtype={"Filtnr": int}, ).drop(columns=["RECNUM"]) filters = filters[filters.MpCode != " "] @@ -328,6 +415,7 @@ def get_daw_filters( def dw_df_to_hpd(dw_df): + """Convert a DAWACO metadata DataFrame to hydropandas-style columns.""" b = pd.DataFrame( index=dw_df.index, data={ @@ -349,25 +437,31 @@ def dw_df_to_hpd(dw_df): return df2gdf(b) -def get_daw_boring(mpcode=None, join_with_mps=False): +def get_daw_boring(mpcode=None, *, join_with_mps=False): + """Return DAWACO boring layer data for monitoring points.""" nenlaag_table = _table("NenLaag") nennorm_table = _table("NenNorm") nentoev_table = _table("NenToev") - query = ( - f"select * from {nenlaag_table} as NenLaag " - f"inner join {nennorm_table} as NenNorm on NenLaag.Nencode = NenNorm.Code " - f"left join {nentoev_table} as NenToev on NenLaag.Recnum = NenToev.Nenlaagrec " + query = _sql( + "select * from {nenlaag} as NenLaag " + "inner join {nennorm} as NenNorm on NenLaag.Nencode = NenNorm.Code " + "left join {nentoev} as NenToev on NenLaag.Recnum = NenToev.Nenlaagrec ", + nenlaag=nenlaag_table, + nennorm=nennorm_table, + nentoev=nentoev_table, ) - query += fuzzy_match_mpcode( + match_mp_str, params = fuzzy_match_mpcode( mpcode=mpcode, filternr=None, partial_match_mpcode=True, mpcode_sql_name="MpCode", filternr_sql_name="filtnr", + return_params=True, ) + query += match_mp_str - b = _read_sql_query(query) + b = _read_sql_query(query, params=params) b.set_index("MpCode", inplace=True) @@ -380,29 +474,35 @@ def get_daw_boring(mpcode=None, join_with_mps=False): def get_daw_triwaco(mpcode=None): + """Return hydrological stratigraphy layers for monitoring points.""" hydstrat_table = _table("HydStrat") mpmv_table = _table("MpMv") - query = ( + query = _sql( "select e.Mpcode as hydmpcode, e.Bk_pak, e.Type_pak, e.Num_pak, " - f"d.Mpcode, d.Maaiveld from {hydstrat_table} e " - f"inner join {mpmv_table} d on e.MpCode = d.Mpcode " + "d.Mpcode, d.Maaiveld from {hydstrat} e " + "inner join {mpmv} d on e.MpCode = d.Mpcode ", + hydstrat=hydstrat_table, + mpmv=mpmv_table, ) - query += fuzzy_match_mpcode( + match_mp_str, params = fuzzy_match_mpcode( mpcode=mpcode, filternr=None, partial_match_mpcode=True, mpcode_sql_name="e.Mpcode", filternr_sql_name="filtnr", + return_params=True, ) + query += match_mp_str - b = _read_sql_query(query) + b = _read_sql_query(query, params=params) del b["hydmpcode"] + b = b.sort_values(["Mpcode", "Bk_pak", "Num_pak", "Type_pak"], kind="mergesort").reset_index(drop=True) - b["dikte"] = b.groupby("Mpcode")["Bk_pak"].transform(lambda x: x.diff().shift(-1)) + b["dikte"] = b.groupby("Mpcode", sort=False)["Bk_pak"].transform(lambda x: x.diff().shift(-1)) # remove layers that have 1 cm thickness minimal thickness dawaco - b = b[np.logical_or(b["dikte"] > 0.01, pd.isna(b.dikte))] + b = b[np.logical_or(b["dikte"] > MIN_TRIWACO_LAYER_THICKNESS_M, pd.isna(b.dikte))] b["bkp_nap"] = -b["Bk_pak"] + b["Maaiveld"] b["okp_nap"] = b["bkp_nap"] - b["dikte"] @@ -411,6 +511,7 @@ def get_daw_triwaco(mpcode=None): def get_daw_soort_mp(a, key="Soort"): + """Map DAWACO monitoring point type codes to readable labels in-place.""" foutje = { "19ANL5196": 5, "19ANL5197": 5, @@ -435,11 +536,16 @@ def get_daw_soort_mp(a, key="Soort"): a[key] = a[key].map(sd) -def get_daw_coords_from_mpcode(mpcode=None, partial_match_mpcode=True): - assert isinstance(mpcode, str), "Only strings are accepted" +def get_daw_coords_from_mpcode(mpcode=None, *, partial_match_mpcode=True): + """Return RD New coordinates for a single monitoring point code.""" + if not isinstance(mpcode, str): + msg = "Only strings are accepted" + raise TypeError(msg) mp = get_daw_mps(mpcode=mpcode, partial_match_mpcode=partial_match_mpcode) - assert len(mp) == 1, f"Ambigous mpcode: {', '.join(mp.index)}" + if len(mp) != 1: + msg = f"Ambiguous mpcode: {', '.join(mp.index)}" + raise ValueError(msg) x, y = mp[["Xcoor", "Ycoor"]].values[0] @@ -447,46 +553,64 @@ def get_daw_coords_from_mpcode(mpcode=None, partial_match_mpcode=True): def get_daw_meteo_from_loc(x=None, y=None, mpcode=None, mettype=None, start_date=None, end_date=None): - """ - Returns the nearest meteo stations that are needed to fill a timeseries from - start_date to end_date as `out`. + """Return nearest meteo stations needed to fill a timeseries. + + The returned series covers ``start_date`` to ``end_date``. A timeseries is composed using most data of the nearest station and using more remote stations to fill the gaps and is returned as df_out. """ if mpcode is not None: - assert x is None and y is None, "Use either the coodinates or mpcode to refer to a location" + if x is not None or y is not None: + msg = "Use either the coordinates or mpcode to refer to a location" + raise ValueError(msg) x, y = get_daw_coords_from_mpcode(mpcode=mpcode, partial_match_mpcode=True) - assert start_date is not None - assert end_date is not None - assert mettype in meteo_pars - - start_date = pd.Timestamp(start_date).floor(freq="D") - end_date = pd.Timestamp(end_date).ceil(freq="D") - istart = meteo_header.index(meteo_pars[mettype] + "_start") - iend = meteo_header.index(meteo_pars[mettype] + "_end") - within_dates = np.array([row[istart] <= end_date and row[iend] >= start_date for row in meteo_arr]) # False for NaT - distance = np.array([((row[2] - x) ** 2 + (row[3] - y) ** 2) ** 0.5 for row in meteo_arr]) + if x is None or y is None: + msg = "Provide x/y coordinates or mpcode" + raise ValueError(msg) + x_coord = float(x) + y_coord = float(y) + + if mettype not in meteo_pars: + msg = f"Unsupported mettype: {mettype!r}" + raise ValueError(msg) + + start_date = _required_timestamp(start_date, "start_date").floor("D") + end_date = _required_timestamp(end_date, "end_date").ceil("D") + meteo = _meteo_station_frame() + start_key = meteo_pars[mettype] + "_start" + end_key = meteo_pars[mettype] + "_end" + station_start = pd.to_datetime(meteo[start_key]) + station_end = pd.to_datetime(meteo[end_key]) + within_dates = ((station_start <= end_date) & (station_end >= start_date)).to_numpy(dtype=bool) + distance = np.sqrt( + (meteo["x"].to_numpy(dtype=float) - x_coord) ** 2 + (meteo["y"].to_numpy(dtype=float) - y_coord) ** 2 + ) isorts = np.arange(len(meteo_arr))[within_dates][np.argsort(distance[within_dates])] + if len(isorts) == 0: + msg = f"No meteo station covers {mettype} from {start_date.date()} to {end_date.date()}." + raise ValueError(msg) isorts_nooverlap = [isorts[0]] nooverlap_start, nooverlap_end = ( - meteo_arr[isorts[0]][istart], - meteo_arr[isorts[0]][iend], + station_start.iloc[isorts[0]], + station_end.iloc[isorts[0]], ) for isort in isorts: - if (meteo_arr[isort][istart] < nooverlap_start and nooverlap_start > start_date) or ( - meteo_arr[isort][iend] > nooverlap_end and nooverlap_end < end_date + candidate_start = station_start.iloc[isort] + candidate_end = station_end.iloc[isort] + if (candidate_start < nooverlap_start and nooverlap_start > start_date) or ( + candidate_end > nooverlap_end and nooverlap_end < end_date ): isorts_nooverlap.append(isort) - nooverlap_start, nooverlap_end = ( - meteo_arr[isort][istart], - meteo_arr[isort][iend], - ) + nooverlap_start, nooverlap_end = candidate_start, candidate_end - out = [(meteo_arr[i][0], distance[i], get_daw_ts_meteo(meteo_arr[i][0], mettype)) for i in isorts_nooverlap] + out = [ + (str(meteo.iloc[i]["statcode"]), distance[i], get_daw_ts_meteo(str(meteo.iloc[i]["statcode"]), mettype)) + for i in isorts_nooverlap + ] # construct merged_ts: if len(out) > 1: @@ -504,37 +628,23 @@ def get_daw_meteo_from_loc(x=None, y=None, mpcode=None, mettype=None, start_date def get_daw_meteo_arr_daterange(): - """Constructs the array used by get_daw_meteo_from_loc.""" - import pprint - - l = [] - for sc in meteo_arr: - ll = [] - for mt in [ - "Neerslag", - "Temperatuur", - "Temp. maximum", - "Temp. minimum", - "Verdamping", - ]: - df = get_daw_ts_meteo(sc[0], mt) - ll.extend([df.index.min(), df.index.max()]) - - l.append(ll) - - arr3 = [[a, b, int(c), int(d), *lli] for (a, b, c, d), lli in zip([a[:4] for a in meteo_arr], l)] - for ai in arr3: - s = pprint.pformat(ai, width=999, indent=4) - # s = s.replace("Timestamp", "pd.Timestamp") - s = s.replace("Timestamp", "a") - s = s.replace(" 00:00:00')", "')") - # s = s.replace("NaT", "pd.NaT") - s = s.replace("NaT", "b") - s += "," + """Return station date ranges reconstructed from meteo measurements.""" + rows = [] + meteo = _meteo_station_frame() + for _, station in meteo.iterrows(): + row = [station["statcode"], station["name"], int(station["x"]), int(station["y"])] + for mettype in meteo_pars: + timeseries = get_daw_ts_meteo(str(station["statcode"]), mettype) + row.extend([timeseries.index.min(), timeseries.index.max()]) + + rows.append(row) + + return pd.DataFrame(rows, columns=pd.Index(meteo_header)) def get_knmi_station_meta(): - """ + """Return KNMI station metadata. + Op deze pagina treft U een overzicht aan van alle tijdreeksen van de 320 actuele neerslagstations en 350 historische neerslagstations. Vermeld wordt de 24-uurs neerslagsom, gemeten van 0800 utc op de voorafgaande dag tot 0800 utc op de vermelde datum. De hoeveelheid wordt weergegeven in tienden van millimeters. De neerslaggegevens @@ -549,7 +659,8 @@ def get_knmi_station_meta(): Returns ------- - + pandas.DataFrame + KNMI station metadata indexed by station code. """ def mydateparser(x): @@ -568,7 +679,7 @@ def mydateparser(x): def get_daw_ts_meteo(statcode, mettype): - """ + """Return a DAWACO meteo time series for a station and meteo type. Parameters ---------- @@ -593,20 +704,24 @@ def get_daw_ts_meteo(statcode, mettype): TMIN Temp. minimum oC V Verdamping mm """ - assert statcode in [row[0] for row in meteo_arr], "not a valid statcode" - assert mettype in meteo_pars, "not a valid mettype" - - q = f"SELECT * FROM {_table('metwaar')} WHERE code = '{statcode}' and code_par = '{meteo_pars[mettype]}' " - b = _read_sql_query(q) + if statcode not in [row[0] for row in meteo_arr]: + msg = "not a valid statcode" + raise ValueError(msg) + if mettype not in meteo_pars: + msg = "not a valid mettype" + raise ValueError(msg) + + q = _sql("SELECT * FROM {metwaar} WHERE code = :statcode and code_par = :metpar ", metwaar=_table("metwaar")) + b = _read_sql_query(q, params={"statcode": statcode, "metpar": meteo_pars[mettype]}) waarnemingen = b[[s for s in b.columns if "W_d" in s]].values.reshape(-1) jaar = np.repeat(b.Jaar.values, 31).astype(int).astype(str) maand = np.repeat(b.Maand.values, 31).astype(int).astype(str) dag = np.tile(np.arange(1, 32), len(b)).astype(int).astype(str) - dates_str = np.array([x1 + "-" + x2 + "-" + x3 for x1, x2, x3 in zip(jaar, maand, dag)]) + dates_str = np.array([x1 + "-" + x2 + "-" + x3 for x1, x2, x3 in zip(jaar, maand, dag, strict=True)]) # sometimes missing value is -99 and sometimes 0. dates = pd.to_datetime(dates_str, format="%Y-%m-%d", errors="coerce") - mask = np.logical_and(waarnemingen != -99.0, ~pd.isnull(dates)) + mask = np.logical_and(waarnemingen != METEO_MISSING_VALUE, ~pd.isnull(dates)) # Is an error made if there are multiple values? # print('Missing value: ', np.unique(waarnemingen[pd.isnull(dates)])) @@ -619,25 +734,35 @@ def get_daw_ts_meteo(statcode, mettype): def get_daw_ts_stijghgt(mpcode=None, filternr=None): - assert mpcode is not None and filternr is not None, "Define mpcode and filternr" + """Return groundwater-level measurements for a monitoring point filter.""" + if mpcode is None or filternr is None: + msg = "Define mpcode and filternr" + raise ValueError(msg) + filternr_value = _normalise_filternr(filternr) stijghgt_table = _table("Stijghgt") filters_table = _table("Filters") - query = f""" + query = _sql( + """ SELECT datum, tijd, meting_nap - FROM {stijghgt_table} as Stijghgt - INNER JOIN {filters_table} as Filters on Filters.recnum = Stijghgt.filtrec - WHERE Filters.mpcode = '{mpcode!s}' and Filters.filtnr = '{filternr!s}'""" + FROM {stijghgt} as Stijghgt + INNER JOIN {filters} as Filters on Filters.recnum = Stijghgt.filtrec + WHERE Filters.mpcode = :mpcode and Filters.filtnr = :filternr""", + stijghgt=stijghgt_table, + filters=filters_table, + ) query += """\nORDER BY datum, tijd""" - b = _read_sql_query(query) + b = _read_sql_query(query, params={"mpcode": mpcode, "filternr": filternr_value}) values = b["meting_nap"].values - values[values < -60.0] = np.nan + values[values < MIN_VALID_HEAD_NAP] = np.nan if len(b) == 0: - mps = _read_sql_query(f"SELECT MpCode FROM {_table('mp')}").values[:, 0] - assert mpcode in mps, f"mpcode: {mpcode} not in Dawaco" + mps = _read_sql_query(_sql("SELECT MpCode FROM {mp}", mp=_table("mp"))).values[:, 0] + if mpcode not in mps: + msg = f"mpcode: {mpcode} not in Dawaco" + raise ValueError(msg) out = pd.Series( data=values, @@ -649,20 +774,28 @@ def get_daw_ts_stijghgt(mpcode=None, filternr=None): def get_daw_ts_temp(mpcode=None, filternr=None): - assert mpcode is not None and filternr is not None, "Define mpcode and filternr" + """Return temperature measurements for a monitoring point filter.""" + if mpcode is None or filternr is None: + msg = "Define mpcode and filternr" + raise ValueError(msg) + filternr_value = _normalise_filternr(filternr) stijghgt_table = _table("Stijghgt") filters_table = _table("Filters") - query = f""" + query = _sql( + """ SELECT datum, tijd, Temp - FROM {stijghgt_table} as Stijghgt - INNER JOIN {filters_table} as Filters on Filters.recnum = Stijghgt.filtrec - WHERE Filters.mpcode = '{mpcode!s}' and Filters.filtnr = '{filternr!s}' - ORDER BY datum, tijd""" + FROM {stijghgt} as Stijghgt + INNER JOIN {filters} as Filters on Filters.recnum = Stijghgt.filtrec + WHERE Filters.mpcode = :mpcode and Filters.filtnr = :filternr + ORDER BY datum, tijd""", + stijghgt=stijghgt_table, + filters=filters_table, + ) - b = _read_sql_query(query) + b = _read_sql_query(query, params={"mpcode": mpcode, "filternr": filternr_value}) values = b["Temp"].values - values[values < -60.0] = np.nan + values[values < MIN_VALID_TEMPERATURE_C] = np.nan values[values == 0.0] = np.nan name = f"{mpcode!s}_{filternr!s}" @@ -683,8 +816,10 @@ def get_hpd_gws_obs( df_filter=None, mpcode=None, filternr=None, + *, partial_match_mpcode=True, ): + """Return a hydropandas GroundwaterObs for a single DAWACO filter.""" if df_filter is None: filter_metadata_df = get_daw_filters( mpcode=mpcode, @@ -694,11 +829,13 @@ def get_hpd_gws_obs( else: filter_metadata_df = df_filter - assert len(filter_metadata_df) == 1, ( - f"Your mpcode is not specific enough. " - f"Multiple are returned: \n{filter_metadata_df}. \n" - f"Or set `partial_match_mpcode` to `False` and use the complete `mpcode`" - ) + if len(filter_metadata_df) != 1: + msg = ( + f"Your mpcode is not specific enough. " + f"Multiple are returned: \n{filter_metadata_df}. \n" + f"Or set `partial_match_mpcode` to `False` and use the complete `mpcode`" + ) + raise ValueError(msg) filter_metadata = filter_metadata_df.iloc[0] gws_measurements = get_daw_ts_stijghgt( @@ -738,8 +875,11 @@ def get_hpd_gws_obs( ) -def get_nlmod_index_nearest_cell(fils, model_ds, error_if_nearest_cell_inactive=False): - assert isinstance(fils, gpd.GeoDataFrame) +def get_nlmod_index_nearest_cell(fils, model_ds, *, error_if_nearest_cell_inactive=False): + """Return nearest nlmod model cell indices for DAWACO filters.""" + if not isinstance(fils, geopandas.GeoDataFrame): + msg = "fils must be a geopandas.GeoDataFrame" + raise TypeError(msg) fils = fils.copy() qxc = xr.DataArray( @@ -761,7 +901,9 @@ def get_nlmod_index_nearest_cell(fils, model_ds, error_if_nearest_cell_inactive= nearest_coords = distances.argmin(dim=("icell2d", "layer")) if error_if_nearest_cell_inactive: - assert (model_ds.idomain.isel(**nearest_coords) > 0).all(), "Filters are placed in inactive cells" + if not (model_ds.idomain.isel(**nearest_coords) > 0).all(): + msg = "Filters are placed in inactive cells" + raise ValueError(msg) elif (model_ds.idomain.isel(**nearest_coords) > 0).all(): pass @@ -772,8 +914,9 @@ def get_nlmod_index_nearest_cell(fils, model_ds, error_if_nearest_cell_inactive= return pd.DataFrame(nearest_coords, index=fils.index) -def get_nlmod_vertical_profile(model_ds, x, y, label, active_only=True): - """Get vertical profile of model_ds[label] given global coordinates +def get_nlmod_vertical_profile(model_ds, x, y, label, *, active_only=True): + """Get vertical profile of model_ds[label] given global coordinates. + The returned array contains the [top_cell, bot_cell, value] for all active cells. Returned array has size (3, nlay_active). """ @@ -788,6 +931,7 @@ def get_nlmod_vertical_profile(model_ds, x, y, label, active_only=True): def get_regis_ds(rds_x, rds_y, keys=None): + """Return a REGIS dataset nearest to RD New coordinates.""" regis_ds = xr.open_dataset("https://dinodata.nl/opendap/REGIS/REGIS.nc") dsi_r = regis_ds.sel(x=rds_x, y=rds_y, method="nearest") @@ -801,8 +945,7 @@ def get_regis_ds(rds_x, rds_y, keys=None): def identify_data_gaps(series): - """ - Add NaN's at places where data is expected. + """Add NaN's at places where data is expected. instead of == maybe use isclose @@ -812,35 +955,44 @@ def identify_data_gaps(series): Returns ------- - + pandas.Series + Original series plus inserted NaNs at inferred gaps. """ series = series[pd.notna(series.index)] - index, values = series.index, series.values - dt = (index[1:] - index[:-1]).values - assert np.all(dt.astype(float) > 0), "Index is not sorted or has duplicates" + if len(series) < MIN_SERIES_LENGTH_FOR_GAP_FILL: + return series - iden = (np.roll(dt, -2) == np.roll(dt, -1)) & (np.roll(dt, 1) == np.roll(dt, 2)) & (dt >= 2 * np.roll(dt, -1)) + index = series.index + index_delta = (index[1:] - index[:-1]).values + if not np.all(index_delta.astype(float) > 0): + msg = "Index is not sorted or has duplicates" + raise ValueError(msg) + + iden = ( + (np.roll(index_delta, -2) == np.roll(index_delta, -1)) + & (np.roll(index_delta, 1) == np.roll(index_delta, 2)) + & (index_delta >= MIN_SERIES_LENGTH_FOR_GAP_FILL * np.roll(index_delta, -1)) + ) start, end, delta, delta_prev = ( index[:-1][iden], index[1:][iden], - dt[iden], - np.roll(dt, -1)[iden], + index_delta[iden], + np.roll(index_delta, -1)[iden], ) - out_index, out_values = index.copy(), values.copy() - for si, ei, _di, dpi in zip(start, end, delta, delta_prev): + inserted = [] + for si, ei, _di, dpi in zip(start, end, delta, delta_prev, strict=True): t_insert = np.arange(si, ei, dpi)[1:] - v_insert = np.full(t_insert.shape, np.nan, dtype=float) - before_ind = np.argwhere(index == ei).item() - out_index = np.insert(out_index, before_ind, t_insert) - out_values = np.insert(out_values, before_ind, v_insert) + if len(t_insert) > 0: + inserted.append(pd.Series(data=np.nan, index=pd.to_datetime(t_insert), name=series.name)) - out = pd.Series(data=out_values, index=out_index, name=series.name) + out = pd.concat([series, *inserted]).sort_index(kind="mergesort") if inserted else series.copy() - out_dt = (index[1:] - index[:-1]).values - if ~np.all(out_dt.astype(float) > 0): - print( - f"Unable to fill gaps of {series.name}. Index is not sorted or has duplicates: {out_dt}. Returning original series." + out_dt = (out.index[1:] - out.index[:-1]).values + if not np.all(out_dt.astype(float) > 0): + warnings.warn( + f"Unable to fill gaps of {series.name}. Index is not sorted or has duplicates: {out_dt}. Returning original series.", + stacklevel=2, ) return series return out diff --git a/dawacotools/io_plenty.py b/dawacotools/io_plenty.py index 1815667..f6c382b 100644 --- a/dawacotools/io_plenty.py +++ b/dawacotools/io_plenty.py @@ -77,23 +77,23 @@ "HEI818": lambda s: s == "19CZIM8 18", "HEI819": lambda s: s == "19CZIM8 19", "HEI820": lambda s: s == "19CZIM8 20", - "CAA1DP_FT10": lambda s: False, # Bemaling? - "HNWHAA_FQ10P": lambda s: False, # Gooi - "LAWLAA_FQ10P": lambda s: False, # Gooi - "LAWLAA_FQ20R": lambda s: False, # Gooi - "LAWLAA_FQ10R": lambda s: False, # Gooi + "CAA1DP_FT10": lambda _s: False, # Bemaling? + "HNWHAA_FQ10P": lambda _s: False, # Gooi + "LAWLAA_FQ10P": lambda _s: False, # Gooi + "LAWLAA_FQ20R": lambda _s: False, # Gooi + "LAWLAA_FQ10R": lambda _s: False, # Gooi } # FLOW: m3/h inclusief return flow. Infiltration is positive. Extraction is negative. (MODFLOW convention) secs_pa_flow = { "CAWP01": "-(4 * CAWP01_FQ10R + 4 * CAWP01_FQ11R)", - "CAWP02": "-(CAWP02_FQ10R.mul(4).add(CAWP02_FQ11R.mul(4))).where((index < '2016-01-01').mul(index >= '2019-01-01'), other=CAWP02_FT10.add(CAWP02_FQ11R.mul(4)))", + "CAWP02": "-(CAWP02_FQ10R.mul(4).add(CAWP02_FQ11R.mul(4))).where((index < '2016-01-01') | (index >= '2019-01-01'), other=CAWP02_FT10.add(CAWP02_FQ11R.mul(4)))", "CAWP03": "-(4 * CAWP03_FQ10R + 4 * CAWP03_FQ11R)", "CAWP04": "-(4 * CAWP04_FQ10R + 4 * CAWP04_FQ11R)", - "CAWP05": "-(CAWP05_FQ10R.mul(4).add(CAWP05_FQ11R.mul(4))).where((index < '2016-02-01').mul(index >= '2018-01-01'), other=CAWP05_FT10.add(CAWP05_FQ11R.mul(4)))", + "CAWP05": "-(CAWP05_FQ10R.mul(4).add(CAWP05_FQ11R.mul(4))).where((index < '2016-02-01') | (index >= '2018-01-01'), other=CAWP05_FT10.add(CAWP05_FQ11R.mul(4)))", "CAWP06": "-(4 * CAWP06_FQ10R + 4 * CAWP06_FQ11R)", - "CAWQ01": "-(CAWQ01_FQ10R.mul(4).add(CAWQ01_FQ11R.mul(4))).where((index < '2012-01-01').mul(index >= '2013-01-01'), other=CAWQ01_FT10.add(CAWQ01_FQ11R.mul(4)))", + "CAWQ01": "-(CAWQ01_FQ10R.mul(4).add(CAWQ01_FQ11R.mul(4))).where((index < '2012-01-01') | (index >= '2013-01-01'), other=CAWQ01_FT10.add(CAWQ01_FQ11R.mul(4)))", "CAWQ02": "-(4 * CAWQ02_FQ10R + 4 * CAWQ02_FQ11R)", "CAWQ03": "-(4 * CAWQ03_FQ10R + 4 * CAWQ03_FQ11R)", "CAWQ04": "-(4 * CAWQ04_FQ10R + 4 * CAWQ04_FQ11R)", @@ -104,7 +104,7 @@ "HEW903": "-(4 * HEW903_FQ10R + 4 * HEW903_FQ11R)", "HEW904": "-(4 * HEW904_FQ10R + 4 * HEW904_FQ11R)", "HEW905": "-(4 * HEW905_FQ10R + 4 * HEW905_FQ11R)", - "HEW906": "-(HEW906_FQ10R.mul(4).add(HEW906_FQ11R.mul(4))).where((index < '2016-01-01').mul(index >= '2018-01-01'), other=HEW906_FT10.add(HEW906_FQ11R.mul(4)))", + "HEW906": "-(HEW906_FQ10R.mul(4).add(HEW906_FQ11R.mul(4))).where((index < '2016-01-01') | (index >= '2018-01-01'), other=HEW906_FT10.add(HEW906_FQ11R.mul(4)))", "HEW101": "-(4 * HEW101_FQ10R + 4 * HEW101_FQ11R)", "HEW102": "-(4 * HEW102_FQ10R + 4 * HEW102_FQ11R)", "HEW103": "-(4 * HEW103_FQ10R + 4 * HEW103_FQ11R)", @@ -116,9 +116,9 @@ "CAWAAF": "-4 * CAWAAF_FQ10R", "CAWHAA": "-4 * CAWHAA_FQ10R", "HEW4AA": "-4 * HEW4AA_FQ10R", - "HEWPAF": "-(HEWPAF_FQ10R.mul(4)).where((index < '2012-01-01').mul(index >= '2013-01-01'), other=HEWPAF_FT10)", - "BEWARU": "-(BEWARU_FQ10R.mul(4)).where((index < '2004-01-01').mul(index >= '2005-01-01'), other=BEWARU_FT10)", - "BEWBRU": "-(BEWBRU_FQ10R.mul(4)).where((index < '2016-01-01').mul(index >= '2018-01-01'), other=BEWBRU_FT10)", + "HEWPAF": "-(HEWPAF_FQ10R.mul(4)).where((index < '2012-01-01') | (index >= '2013-01-01'), other=HEWPAF_FT10)", + "BEWARU": "-(BEWARU_FQ10R.mul(4)).where((index < '2004-01-01') | (index >= '2005-01-01'), other=BEWARU_FT10)", + "BEWBRU": "-(BEWBRU_FQ10R.mul(4)).where((index < '2016-01-01') | (index >= '2018-01-01'), other=BEWBRU_FT10)", "BEWCRU": "-4 * BEWCRU_FQ10R", "BEWPRU": "-4 * BEWPRU_FQ10R", "HEW8AF": "-4 * HEW8AF_FQ10R", # Double check. correct according to overview Henk @@ -135,7 +135,8 @@ "HEW811": "-4 * HEW811_FQ10R", "HEW812": "-4 * HEW812_FQ10R", # Only use these if deltatime is 15 minutes or less, else use HEI, or create new tags with avg HEI8AA_FQ10R - "HEI801": "4 * HEI801_FQ10R - 4 * HEI8AA_FQ10R / 20", # TODO: Alternative division of return flow HvH + # Domain assumption: distribute HEI8AA return flow evenly over the 20 HEI8xx injection wells. + "HEI801": "4 * HEI801_FQ10R - 4 * HEI8AA_FQ10R / 20", "HEI802": "4 * HEI802_FQ10R - 4 * HEI8AA_FQ10R / 20", "HEI803": "4 * HEI803_FQ10R - 4 * HEI8AA_FQ10R / 20", "HEI804": "4 * HEI804_FQ10R - 4 * HEI8AA_FQ10R / 20", @@ -234,9 +235,12 @@ "Gooi-Tot-Ont": "-(LAWLAAZUID + LAWLAANOORD + HNWHAA)", } +PLENTY_DAILY_SUM_FLAG = 5.0 +PLENTY_MAX_ABS_VALUE = 10000.0 -def mpcode_to_sec_pa_tag(mpcode): - """Returns the sec_pa tag for a single mpcode. If no sec_pa tag is found, an empty string is returned. + +def mpcode_to_sec_pa_tag(mpcode: str) -> str: + """Return the sec_pa tag for a single mpcode. Parameters ---------- @@ -256,8 +260,8 @@ def mpcode_to_sec_pa_tag(mpcode): return "" -def mpcode_to_sec_pa_flow(df_plenty, mpcode): - """Returns the flow for a single mpcode. +def mpcode_to_sec_pa_flow(df_plenty: pd.DataFrame, mpcode: str) -> pd.Series: + """Return the flow for a single mpcode. Parameters ---------- @@ -271,14 +275,16 @@ def mpcode_to_sec_pa_flow(df_plenty, mpcode): flow : float Flow in m3/h for the specific well """ - assert isinstance(mpcode, str), "single mpcode allowed" + if not isinstance(mpcode, str): + msg = "single mpcode allowed" + raise TypeError(msg) patag = mpcode_to_sec_pa_tag(mpcode) flow_eq = secs_pa_flow[patag] return df_plenty.eval(flow_eq) -def get_required_patags_for_flow(df=None): - """Returns the required Plenty tags for the flow calculation. +def get_required_patags_for_flow(df: pd.DataFrame | None = None) -> set[str]: + """Return the required Plenty tags for the flow calculation. If `df` is None, the tags are returned for all wells. Otherwise, the tags are returned for the wells in `df`. @@ -304,8 +310,8 @@ def get_required_patags_for_flow(df=None): return set(filter(lambda s: "_" in s, comb.split(" "))) -def get_sec_pa(df): - """Returns the sec_pa tag for each well in `df`. +def get_sec_pa(df: pd.DataFrame) -> list[str]: + """Return the sec_pa tag for each well in `df`. Parameters ---------- @@ -319,11 +325,11 @@ def get_sec_pa(df): """ ispomp = ispomp_filter(df) mpcodes = df.reset_index().MpCode - return np.where(ispomp, list(map(mpcode_to_sec_pa_tag, mpcodes)), "") + return np.where(ispomp, list(map(mpcode_to_sec_pa_tag, mpcodes)), "").astype(str).tolist() -def ispomp_filter(df): - """Returns a boolean array indicating whether a well is a pump well or not. Soort in dawaco db is not correct. +def ispomp_filter(df: pd.DataFrame) -> pd.Series: + """Return a boolean array indicating whether each well is a pump well. Parameters ---------- @@ -338,8 +344,8 @@ def ispomp_filter(df): return df.Filtnr == 0 -def get_nput_dict(df): - """Returns a dictionary with the nput for each well in `df`. +def get_nput_dict(df: pd.DataFrame) -> dict[str, float]: + """Return a dictionary with the nput for each well in `df`. Parameters ---------- @@ -354,13 +360,13 @@ def get_nput_dict(df): ispomp = ispomp_filter(df) mpcodes = df[ispomp].reset_index().MpCode u, counts = np.unique(list(map(mpcode_to_sec_pa_tag, mpcodes)), return_counts=True) - nput_dict = dict(zip(u, counts)) + nput_dict = {str(k): float(v) for k, v in zip(u, counts, strict=True)} nput_dict[""] = np.nan return nput_dict -def get_nput(df): - """Returns the nput for each well in `df`. +def get_nput(df: pd.DataFrame) -> list[float]: + """Return the nput for each well in `df`. Parameters ---------- @@ -374,11 +380,44 @@ def get_nput(df): """ sec = get_sec_pa(df) nput_dict = get_nput_dict(df) - return [nput_dict.get(k, k) for k in sec] + return [nput_dict[k] for k in sec] + + +def _normalize_single_plenty_row(df_plenty: pd.Series | pd.DataFrame) -> pd.DataFrame: + if isinstance(df_plenty, pd.Series): + try: + date = pd.Timestamp(str(df_plenty.name)) + except (ValueError, TypeError): + date = pd.Timestamp.now() + + return pd.DataFrame( + columns=df_plenty.index.values, + data=df_plenty.values.reshape(1, -1), + index=pd.DatetimeIndex([date]), + ) + + if isinstance(df_plenty, pd.DataFrame): + if len(df_plenty) != 1: + msg = "Only one value/timestamp/average allowed" + raise ValueError(msg) + return df_plenty + + raise NotImplementedError -def get_flow(df, df_plenty, divide_by_nput=True): - """Returns the flow for each well in `df`. +def _eval_single_plenty_flow(df_plenty: pd.DataFrame, flow_eq: str) -> float: + if not flow_eq: + return np.nan + + flow = df_plenty.eval(flow_eq) + if isinstance(flow, pd.Series): + return float(flow.iloc[0]) + + return float(np.asarray(flow).reshape(-1)[0]) + + +def get_flow(df: pd.DataFrame, df_plenty: pd.Series | pd.DataFrame, *, divide_by_nput: bool = True) -> np.ndarray: + """Return the flow for each well in `df` for one Plenty timestamp. Parameters ---------- @@ -392,47 +431,34 @@ def get_flow(df, df_plenty, divide_by_nput=True): Returns ------- flow : np.ndarray - Flow for each well in `df` if divide_by_nput is True. Otherwise, the flow is returned for each well in `df` without division by the nput. + One-dimensional flow array. Non-pump rows receive ``np.nan``. """ - if isinstance(df_plenty, pd.Series): - try: - date = pd.Timestamp(df_plenty.name) - except (ValueError, TypeError): - date = pd.Timestamp.now() - - df_plenty = pd.DataFrame( - columns=df_plenty.index.values, - data=df_plenty.values.reshape(1, -1), - index=[date], - ) - - elif isinstance(df_plenty, pd.DataFrame): - assert len(df_plenty) == 1, "Only one value value/timestamp/average allowed" - else: - raise NotImplementedError + df_plenty = _normalize_single_plenty_row(df_plenty) required_pa_tags = get_required_patags_for_flow(df=df) - assert all(i in df_plenty.columns for i in required_pa_tags), ( - f"`df_plenty` requires the following Plenty tags:\n{required_pa_tags}" - ) - assert np.issubdtype(df_plenty.index, np.datetime64), "Index needs to be a date" + if missing_tags := required_pa_tags.difference(df_plenty.columns): + msg = f"`df_plenty` requires the following Plenty tags:\n{missing_tags}" + raise ValueError(msg) + if not isinstance(df_plenty.index, pd.DatetimeIndex): + msg = "Index needs to be a date" + raise TypeError(msg) sec = get_sec_pa(df) u_sec = set(sec) u_pa_flow_eqs = [secs_pa_flow.get(k, k) for k in u_sec] - u_pa_flow = [df_plenty.eval(eq) if eq else np.nan for eq in u_pa_flow_eqs] + u_pa_flow = [_eval_single_plenty_flow(df_plenty, eq) for eq in u_pa_flow_eqs] if divide_by_nput: nput_dict = get_nput_dict(df) - pa_flow_dict = {k: v / nput_dict[k] for k, v in zip(u_sec, u_pa_flow)} + pa_flow_dict = {k: v / nput_dict[k] for k, v in zip(u_sec, u_pa_flow, strict=True)} else: - pa_flow_dict = dict(zip(u_sec, u_pa_flow)) + pa_flow_dict = dict(zip(u_sec, u_pa_flow, strict=True)) return np.array([pa_flow_dict.get(k, k) for k in sec], dtype=float) -def get_flows(df, df_plenty, divide_by_nput=True): - """Returns the flow for each well in `df`. +def get_flows(df: pd.DataFrame, df_plenty: pd.Series | pd.DataFrame, *, divide_by_nput: bool = True) -> np.ndarray: + """Alias for :func:`get_flow` retained for backward compatibility. Parameters ---------- @@ -446,47 +472,13 @@ def get_flows(df, df_plenty, divide_by_nput=True): Returns ------- flow : np.ndarray - Flow for each well in `df` + One-dimensional flow array. Non-pump rows receive ``np.nan``. """ - if isinstance(df_plenty, pd.Series): - try: - date = pd.Timestamp(df_plenty.name) - except (ValueError, TypeError): - date = pd.Timestamp.now() + return get_flow(df, df_plenty, divide_by_nput=divide_by_nput) - df_plenty = pd.DataFrame( - columns=df_plenty.index.values, - data=df_plenty.values.reshape(1, -1), - index=[date], - ) - - elif isinstance(df_plenty, pd.DataFrame): - assert len(df_plenty) == 1, "Only one value value/timestamp/average allowed" - else: - raise NotImplementedError - required_pa_tags = get_required_patags_for_flow(df=df) - assert all(i in df_plenty.columns for i in required_pa_tags), ( - f"`df_plenty` requires the following Plenty tags:\n{required_pa_tags}" - ) - assert np.issubdtype(df_plenty.index, np.datetime64), "Index needs to be a date" - - sec = get_sec_pa(df) - u_sec = set(sec) - u_pa_flow_eqs = [secs_pa_flow.get(k, k) for k in u_sec] - u_pa_flow = [df_plenty.eval(eq) if eq else np.nan for eq in u_pa_flow_eqs] - - if divide_by_nput: - nput_dict = get_nput_dict(df) - pa_flow_dict = {k: v / nput_dict[k] for k, v in zip(u_sec, u_pa_flow)} - else: - pa_flow_dict = dict(zip(u_sec, u_pa_flow)) - - return np.array([pa_flow_dict.get(k, k) for k in sec], dtype=float) - - -def get_sec_pa_flows(df_plenty): - """Returns the sec_pa_flows as defined in `secs_pa_flow` for the Plenty data `df_plenty`. +def get_sec_pa_flows(df_plenty: pd.DataFrame) -> pd.DataFrame: + """Return the sec_pa flows as defined in `secs_pa_flow` for the Plenty data `df_plenty`. Parameters ---------- @@ -501,8 +493,8 @@ def get_sec_pa_flows(df_plenty): return pd.DataFrame({k: df_plenty.eval(v) for k, v in secs_pa_flow.items()}) -def get_tra_flows(df_plenty): - """Returns the tra_flows as defined in `tra_alias` for the Plenty data `df_plenty`. +def get_tra_flows(df_plenty: pd.DataFrame) -> pd.DataFrame: + """Return the TRA flows as defined in `tra_alias` for the Plenty data `df_plenty`. Parameters ---------- @@ -535,8 +527,8 @@ def get_tra_flows(df_plenty): return df[tra_alias.keys()] -def get_plenty_data(fp, center_average_values=None, sanity_checks=True): - """Returns the Plenty data from the file `fp`. +def get_plenty_data(fp, center_average_values=None, *, sanity_checks=True): + """Return the Plenty data from the file `fp`. Parameters ---------- @@ -553,21 +545,35 @@ def get_plenty_data(fp, center_average_values=None, sanity_checks=True): Plenty data """ data = pd.read_excel(fp, skiprows=9, index_col="ophaal tijdstip", na_values=["EOF"]) + data_index = pd.DatetimeIndex(data.index) + data.index = data_index config_df = pd.read_excel(fp, skiprows=0, nrows=5, header=None, usecols=[0, 1, 3]) - assert config_df.iloc[4, 0] == "gemiddelde ?", "Unable to read configuration. Set `center_average_values` manually" - is_dagsom = config_df.iloc[0, 2] == 5.0 - assert not is_dagsom, "Dagsom not supported. In other parts flow units are assumed instead of dagsom's 'm3'." + if config_df.iloc[4, 0] != "gemiddelde ?": + msg = "Unable to read configuration. Set `center_average_values` manually" + raise ValueError(msg) + is_dagsom = config_df.iloc[0, 2] == PLENTY_DAILY_SUM_FLAG + if is_dagsom: + msg = "Dagsom not supported. In other parts flow units are assumed instead of dagsom's 'm3'." + raise ValueError(msg) if sanity_checks: # check config is in sync with the data timedelta_config = pd.Timedelta(f"{config_df.iloc[2, 1]}h") - assert timedelta_config == (data.index[1] - data.index[0]), "Configuration and data are not in sync" - assert timedelta_config == (data.index[-1] - data.index[-2]), "Configuration and data are not in sync" - assert timedelta_config == pd.Timedelta(data.index.inferred_freq), ( - "Unable to infer frequency from data. Missing rows?" - ) - - data[data.abs() > 10000.0] = np.nan + if timedelta_config != (data.index[1] - data.index[0]): + msg = "Configuration and data are not in sync" + raise ValueError(msg) + if timedelta_config != (data.index[-1] - data.index[-2]): + msg = "Configuration and data are not in sync" + raise ValueError(msg) + inferred_freq = pd.infer_freq(data_index) + if inferred_freq is None: + msg = "Unable to infer frequency from data. Missing rows?" + raise ValueError(msg) + if timedelta_config != pd.Timedelta(inferred_freq): + msg = "Unable to infer frequency from data. Missing rows?" + raise ValueError(msg) + + data[data.abs() > PLENTY_MAX_ABS_VALUE] = np.nan if center_average_values is None: is_avg = config_df.iloc[4, 1] == "ja" diff --git a/dawacotools/plot.py b/dawacotools/plot.py index 179f106..0c1c095 100644 --- a/dawacotools/plot.py +++ b/dawacotools/plot.py @@ -1,7 +1,6 @@ """Plotting functions for visualizing groundwater and geological data.""" -import datetime - +import contextily as ctx import matplotlib.pyplot as plt import numpy as np import xarray as xr @@ -26,15 +25,28 @@ ) # locale.setlocale(locale.LC_ALL, "nl_NL") +BORING_COMPONENT_WIDTHS = { + 1: [0, 1], + 2: [0, 0.75, 1], + 3: [0, 0.5, 0.75, 1], + 4: [0, 0.4, 0.6, 0.8, 1], +} +MIN_GWS_OBSERVATIONS_FOR_MAP = 10 def plot_daw_triwaco(df, ax, zlim=-60): + """Plot DAWACO Triwaco layer information on an axis.""" if len(df) == 0: return d = df.copy() + if isinstance(zlim, tuple | list): + zmin, zmax = zlim + else: + zmin = float(zlim) + zmax = float(d["Maaiveld"].iloc[0]) - d["okp_nap"].fillna(-999.0, inplace=True) + d["okp_nap"] = d["okp_nap"].fillna(-999.0) ax.xaxis.set_visible(False) patches_lay = [] @@ -43,7 +55,7 @@ def plot_daw_triwaco(df, ax, zlim=-60): top = lay.bkp_nap bottom = lay.okp_nap - if top < zlim: + if top < zmin: continue c = (0, 146 / 255, 0) if lay.Type_pak == "S" else [i / 255 for i in (243, 225, 6)] @@ -62,23 +74,27 @@ def plot_daw_triwaco(df, ax, zlim=-60): textstr = lay.Type_pak + " " + str(lay.Num_pak) ax.text( 0.5, - (top + max(bottom, zlim)) / 2, + (top + max(bottom, zmin)) / 2, textstr, fontsize=6, verticalalignment="center", ha="center", ) - ax.add_collection(PatchCollection(patches_lay, match_original=True, edgecolors="none")) - ax.set_ylim((zlim, lay.Maaiveld)) + if patches_lay: + ax.add_collection(PatchCollection(patches_lay, match_original=True, edgecolors="none")) + ax.set_ylim((zmin, zmax)) ax.set_title("Triwaco") def plot_daw_boring(dfi, ax): + """Plot DAWACO boring lithology information on an axis.""" if len(dfi) == 0: return - assert "Maaiveld" in dfi, "Obtain dfi with get_daw_boring using join_with_mps=True" + if "Maaiveld" not in dfi: + msg = "Obtain dfi with get_daw_boring using join_with_mps=True" + raise ValueError(msg) legend_handles = [] legend_names = [] @@ -102,15 +118,8 @@ def plot_daw_boring(dfi, ax): legend_names.append("Niet verwerkt") continue - if ncomp / 2 == 1: - breedten = [0, 1] - elif ncomp / 2 == 2: - breedten = [0, 0.75, 1] - elif ncomp / 2 == 3: - breedten = [0, 0.5, 0.75, 1] - elif ncomp / 2 == 4: - breedten = [0, 0.4, 0.6, 0.8, 1] - else: + breedten = BORING_COMPONENT_WIDTHS.get(ncomp // 2) + if breedten is None: ph = ax.add_patch(Polygon([(0, bottom), (1, bottom), (1, top), (0, top)], facecolor="red")) legend_handles.append(ph) legend_names.append("Niet verwerkt") @@ -120,6 +129,7 @@ def plot_daw_boring(dfi, ax): [li.Nencode[i : i + 2] for i in range(0, ncomp, 2)], breedten[:-1], breedten[1:], + strict=True, ): try: bdi = boorlegenda_dawaco[code[0]] @@ -172,6 +182,7 @@ def plot_daw_boring(dfi, ax): def plot_nlmod_k(xcoord, ycoord, fp_model_ds, ax, zlim=None): + """Plot horizontal and vertical hydraulic conductivity from an nlmod dataset.""" model_ds = xr.open_dataset(fp_model_ds) iicell2d_nearest = np.argmin((model_ds.x.values - xcoord) ** 2 + (model_ds.y.values - ycoord) ** 2) @@ -189,7 +200,7 @@ def plot_nlmod_k(xcoord, ycoord, fp_model_ds, ax, zlim=None): tops = np.concatenate(([top_nearest], botm_nearest[:-1]))[~np.isnan(kh_nearest)] botms = botm_nearest[~np.isnan(kh_nearest)] - y_k = np.array([item for sublist in zip(tops, botms) for item in sublist]) + y_k = np.array([item for sublist in zip(tops, botms, strict=True) for item in sublist]) x_kh = np.repeat(kh_nearest[~np.isnan(kh_nearest)], 2) x_kv = np.repeat(kv_nearest[~np.isnan(kv_nearest)], 2) @@ -197,7 +208,7 @@ def plot_nlmod_k(xcoord, ycoord, fp_model_ds, ax, zlim=None): x_label = (x_kh[::2] + x_kv[1::2]) / 2 y_label = (y_k[::2] + y_k[1::2]) / 2 - for li, xi, yi in zip(labels, x_label, y_label): + for li, xi, yi in zip(labels, x_label, y_label, strict=True): ax.annotate( li.item(), (xi, yi), @@ -209,7 +220,7 @@ def plot_nlmod_k(xcoord, ycoord, fp_model_ds, ax, zlim=None): ) ax.plot(x_kh, y_k, c="C0", ls="-") - ax.set_xlabel("Kv (m/dag; blauw)") + ax.set_xlabel("Kh (m/dag; blauw)") ax.set_ylabel("mNAP") ax2 = ax.twiny() ax2.plot(x_kv, y_k, c="C1", ls="-.") @@ -220,11 +231,12 @@ def plot_nlmod_k(xcoord, ycoord, fp_model_ds, ax, zlim=None): def plot_daw_filters(filters, ax, linewidth_buis=5, linewidth_filter=10): + """Plot DAWACO filter screen positions on an axis.""" xlim = ax.get_xlim() dx = 1 / (len(filters) + 1) * (xlim[1] - xlim[0]) - for irow, (_mpcode, row) in enumerate(filters.iterrows()): + for irow, (_index, row) in enumerate(filters.iterrows()): x = (irow + 1) * dx + xlim[0] ax.plot( @@ -233,7 +245,7 @@ def plot_daw_filters(filters, ax, linewidth_buis=5, linewidth_filter=10): c="k", linewidth=linewidth_buis, ) - gws = get_daw_ts_stijghgt(mpcode=row.name, filternr=row.Filtnr) + gws = get_daw_ts_stijghgt(mpcode=row.MpCode, filternr=row.Filtnr) ax.plot( [x, x], [gws.max(), gws.min()], @@ -268,6 +280,7 @@ def plot_daw_mp_map( mpcode=None, mps=None, ax=None, + *, soort=None, annotate_mpcode=True, marker=None, @@ -276,7 +289,15 @@ def plot_daw_mp_map( map_type="satelite", **kwargs, ): - import contextily as ctx + """Plot a DAWACO monitoring point map with optional labels and basemap.""" + if ax is None: + _, ax = plt.subplots() + if mps is None: + msg = "Provide monitoring point metadata with mps" + raise ValueError(msg) + if mpcode is None: + msg = "Provide mpcode" + raise ValueError(msg) xlim = ax.get_xlim() ylim = ax.get_ylim() @@ -300,7 +321,10 @@ def plot_daw_mp_map( ctx.add_basemap(ax=ax, crs="EPSG:28992", source=source, zoom=18) else: - ctx.add_basemap(ax=ax, crs="EPSG:28992", source=ctx.providers.nlmaps.standaard, zoom=18) + provider_group = "nlmaps" + provider_name = "standaard" + source = getattr(getattr(ctx.providers, provider_group), provider_name) + ctx.add_basemap(ax=ax, crs="EPSG:28992", source=source, zoom=18) ax.set_xlim(xlim) ax.set_ylim(ylim) @@ -321,20 +345,21 @@ def plot_daw_mp_map( "Pompput": "X", "Opp.water meetpunt": "^", "Monsterpunt": "s", + "Point of interest": "d", "4": "d", "Infiltratieput": "+", } for soort_iter in mps.Soort.unique(): - m = marker[soort_iter] - ax = mps.plot(marker=m, ax=ax, color=color, label=soort_iter, **kwargs) + m = marker.get(soort_iter, "o") + ax = mps[mps.Soort == soort_iter].plot(marker=m, ax=ax, color=color, label=soort_iter, **kwargs) if annotate_mpcode: - for mpcode, x, y in zip(mps.index, mps.geometry.x, mps.geometry.y): - mp_label = mpcode[4:] + for mpcode_iter, x, y in zip(mps.index, mps.geometry.x, mps.geometry.y, strict=True): + mp_label = mpcode_iter[4:] - if text_dict is not None and mpcode in text_dict: - mp_label += text_dict[mpcode] + if text_dict is not None and mpcode_iter in text_dict: + mp_label += text_dict[mpcode_iter] ax.annotate( mp_label, @@ -351,16 +376,27 @@ def plot_daw_mp_map( def plot_daw_map_gws(filters, vkey="val", vmin=-1.0, vmax=1.0, ax=None, colormap="viridis"): - gwss = [get_daw_ts_stijghgt(mpcode=mpcode, filternr=filter.Filtnr) for mpcode, filter in filters.iterrows()] - gwss = [gws["2017-01-01":] for gws in gwss if gws["2017-01-01":].size > 10] - gwsmeds = {gws.name: gws.median() for gws in gwss} - filtmeds = filters.loc[gwsmeds.keys()] - filtmeds["gwsmed"] = gwsmeds.values() - h = filtmeds.plot.scatter(x="Xcoor", y="Ycoor", c="gwsmed", colormap=colormap, vmin=vmin, vmax=vmax) + """Plot median groundwater levels for filters on a map.""" + if ax is None: + _, ax = plt.subplots() + + filter_positions = [] + gwsmeds = [] + for position, (_index, filter_row) in enumerate(filters.iterrows()): + gws = get_daw_ts_stijghgt(mpcode=filter_row.MpCode, filternr=filter_row.Filtnr)["2017-01-01":] + if gws.size > MIN_GWS_OBSERVATIONS_FOR_MAP: + filter_positions.append(position) + gwsmeds.append(gws.median()) + + filtmeds = filters.iloc[filter_positions].copy() + value_column = "gwsmed" if vkey == "val" else vkey + filtmeds[value_column] = gwsmeds + h = filtmeds.plot.scatter(x="Xcoor", y="Ycoor", c=value_column, colormap=colormap, vmin=vmin, vmax=vmax, ax=ax) return filtmeds, h -def plot_nlmod_vertical_profile(model_ds, ax, x, y, label, mark_inactive=True, **line_plot_kwargs): +def plot_nlmod_vertical_profile(model_ds, ax, x, y, label, *, mark_inactive=True, **line_plot_kwargs): + """Plot a vertical profile for a variable in an nlmod dataset.""" data = get_nlmod_vertical_profile(model_ds, x, y, label, active_only=True) yplot = data[:2].T.reshape(-1) xplot = data[2].repeat(2) @@ -378,6 +414,7 @@ def plot_nlmod_vertical_profile(model_ds, ax, x, y, label, mark_inactive=True, * def plot_regis_lay(rds_x, rds_y, ax, zlim=-60): + """Plot REGIS layer names as a vertical column.""" keys = ["layer", "bottom", "top"] dsi_r2 = get_regis_ds(rds_x, rds_y, keys=keys) @@ -409,14 +446,17 @@ def plot_regis_lay(rds_x, rds_y, ax, zlim=-60): ) ax.add_collection(PatchCollection(patches_lay, match_original=True, edgecolors="none")) - ax.set_ylim((zlim, dsi_r2.top.max())) + ax.set_ylim((zlim, float(dsi_r2.top.max()))) ax.legend(handles=patches_lay, loc="lower left") ax.set_title("REGIS v2.2") def plot_daw_mp(mpcode, fp_model_ds=None, dy_map=50.0, map_type="satelite"): + """Plot a DAWACO monitoring point overview figure.""" filters = get_daw_filters(mpcode) - assert filters.size > 0, f"Geen filters voor mpcode: {mpcode}" + if filters.size == 0: + msg = f"Geen filters voor mpcode: {mpcode}" + raise ValueError(msg) x, y, mv = filters.iloc[0][["Xcoor", "Ycoor", "Maaiveld"]] fig = plt.figure(figsize=(30, 20)) @@ -443,9 +483,14 @@ def plot_daw_mp(mpcode, fp_model_ds=None, dy_map=50.0, map_type="satelite"): bbox = ax_map.get_window_extent().transformed(fig.dpi_scale_trans.inverted()) aspect = bbox.width / bbox.height - extent_map = [x - dy_map * aspect, x + dy_map * aspect, y - dy_map, y + dy_map] - ax_map.set_xlim(extent_map[:2]) - ax_map.set_ylim(extent_map[2:]) + extent_map = ( + float(x - dy_map * aspect), + float(x + dy_map * aspect), + float(y - dy_map), + float(y + dy_map), + ) + ax_map.set_xlim((extent_map[0], extent_map[1])) + ax_map.set_ylim((extent_map[2], extent_map[3])) # plot soils columns df_bor = get_daw_boring(mpcode=mpcode, join_with_mps=True) @@ -478,7 +523,7 @@ def plot_daw_mp(mpcode, fp_model_ds=None, dy_map=50.0, map_type="satelite"): # plot map # mps_map = get_daw_mps().cx[extent_map[0]:extent_map[1], extent_map[2]:extent_map[3]] Bevat vervallen! filters_map = get_daw_filters().cx[extent_map[0] : extent_map[1], extent_map[2] : extent_map[3]] - mps_map = filters_map.reset_index().groupby("MpCode").agg(lambda x: x.iloc[0]) # for multiple occurence of mpcode + mps_map = filters_map.drop_duplicates("MpCode").set_index("MpCode", drop=False) mps_map = df2gdf(mps_map) plot_daw_mp_map( mpcode=mpcode, @@ -495,9 +540,12 @@ def plot_daw_mp(mpcode, fp_model_ds=None, dy_map=50.0, map_type="satelite"): # plot gws ts distances = mps_map.distance(filters.iloc[0].geometry) - filters_near = filters_map.loc[distances[mps_map.StygMeting.notna()].sort_values()[:4].index] + has_gws = mps_map["StygMeting"].notna() if "StygMeting" in mps_map else np.full(len(mps_map), True) + nearby_mpcodes = distances[has_gws].sort_values().head(4).index + filters_near = filters_map[filters_map["MpCode"].isin(nearby_mpcodes)] - for mpc, f in filters_near.iterrows(): + for _index, f in filters_near.iterrows(): + mpc = f.MpCode distance = distances.loc[mpc] label = f"{mpc}-F{f.Filtnr!s}" if distance == 0.0 else f"{mpc}-F{f.Filtnr!s} r={distance:.0f}m" @@ -524,20 +572,26 @@ def plot_daw_mp(mpcode, fp_model_ds=None, dy_map=50.0, map_type="satelite"): def plot_knmi_meteo(ax_ts_meteo, x, y, tmin=None, tmax=None): + """Plot nearest precipitation and evaporation time series for the visible range.""" + if tmin is None or tmax is None: + msg = "Provide tmin and tmax" + raise ValueError(msg) + tmin_range, tmax_range = dates.num2date(tmin), dates.num2date(tmax) tmin_range, tmax_range = ( - datetime.datetime(tmin_range.year, tmin_range.month, tmin_range.day), - datetime.datetime(tmax_range.year, tmax_range.month, tmax_range.day), + tmin_range.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None), + tmax_range.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None), ) - N = get_daw_meteo_from_loc(x=x, y=y, mettype="Neerslag", start_date=tmin_range, end_date=tmax_range)[0] - V = get_daw_meteo_from_loc(x=x, y=y, mettype="Verdamping", start_date=tmin_range, end_date=tmax_range)[0] - N.plot(ax=ax_ts_meteo) - V.plot(ax=ax_ts_meteo) + precipitation = get_daw_meteo_from_loc(x=x, y=y, mettype="Neerslag", start_date=tmin_range, end_date=tmax_range)[0] + evaporation = get_daw_meteo_from_loc(x=x, y=y, mettype="Verdamping", start_date=tmin_range, end_date=tmax_range)[0] + precipitation.plot(ax=ax_ts_meteo) + evaporation.plot(ax=ax_ts_meteo) ax_ts_meteo.legend(fontsize=6) - ax_ts_meteo.set_xlim([tmin, tmax]) + ax_ts_meteo.set_xlim((tmin, tmax)) def plot_regis_kh(rds_x, rds_y, ax, zlim=-60): + """Plot REGIS horizontal hydraulic conductivity ranges.""" keys = ["kh", "sdh", "bottom", "top"] dsi_r2 = get_regis_ds(rds_x, rds_y, keys=keys) @@ -576,6 +630,7 @@ def plot_regis_kh(rds_x, rds_y, ax, zlim=-60): def plot_regis_kv(rds_x, rds_y, ax, zlim=-60): + """Plot REGIS vertical hydraulic conductivity ranges.""" keys = ["kv", "sdv", "bottom", "top"] dsi_r2 = get_regis_ds(rds_x, rds_y, keys=keys) diff --git a/pyproject.toml b/pyproject.toml index a474a66..aef5084 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,16 +56,16 @@ test = [ "pytest-cov>=2.3.1", "pytest-xdist>=3.0.0", "ruff==0.15.6", - "ty", - "validate-pyproject[all,store]", + "ty>=0.0.42", + "validate-pyproject[all,store]>=0.25", ] dev = [ "pytest>=6.2.5", "pytest-cov>=2.3.1", "pytest-xdist>=3.0.0", "ruff==0.15.6", - "ty", - "validate-pyproject[all,store]", + "ty>=0.0.42", + "validate-pyproject[all,store]>=0.25", ] [project.urls] diff --git a/ruff.toml b/ruff.toml index a56055b..deb56ff 100644 --- a/ruff.toml +++ b/ruff.toml @@ -40,7 +40,6 @@ preview = true extend-select = ["D"] [lint.per-file-ignores] -"*" = ["T201"] "tests/**/*.py" = [ "D", "PLR2004", diff --git a/tests/conftest.py b/tests/conftest.py index a0955a0..eae8cac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,7 +57,9 @@ def _private_database_url(): @pytest.fixture(scope="session") def mock_dawaco_engine(tmp_path_factory): database_path = tmp_path_factory.mktemp("dawaco") / "mock_dawaco.sqlite" - return build_mock_dawaco_database(database_path) + engine = build_mock_dawaco_database(database_path) + yield engine + engine.dispose() @pytest.fixture(autouse=True) diff --git a/tests/mock_dawaco.py b/tests/mock_dawaco.py index 9f7434f..1c593b3 100644 --- a/tests/mock_dawaco.py +++ b/tests/mock_dawaco.py @@ -123,6 +123,8 @@ def _write_groundwater_levels(connection: Connection) -> None: {"filtrec": 101, "datum": "2020-01-06", "tijd": "00:00", "meting_nap": 1.40, "Temp": 9.0}, {"filtrec": 102, "datum": "2020-01-01", "tijd": "00:00", "meting_nap": 0.50, "Temp": 7.5}, {"filtrec": 103, "datum": "2020-01-01", "tijd": "00:00", "meting_nap": 0.75, "Temp": 7.0}, + {"filtrec": 103, "datum": "2020-01-03", "tijd": "00:00", "meting_nap": 0.95, "Temp": 7.2}, + {"filtrec": 103, "datum": "2020-01-06", "tijd": "00:00", "meting_nap": 1.35, "Temp": 7.4}, ]).to_sql("Stijghgt", connection, index=False, if_exists="replace") @@ -159,9 +161,9 @@ def _write_boring(connection: Connection) -> None: def _write_triwaco(connection: Connection) -> None: pd.DataFrame([ + {"MpCode": "MOCK001", "Bk_pak": 15.0, "Type_pak": "C", "Num_pak": 3}, {"MpCode": "MOCK001", "Bk_pak": 0.0, "Type_pak": "A", "Num_pak": 1}, {"MpCode": "MOCK001", "Bk_pak": 5.0, "Type_pak": "B", "Num_pak": 2}, - {"MpCode": "MOCK001", "Bk_pak": 15.0, "Type_pak": "C", "Num_pak": 3}, ]).to_sql("HydStrat", connection, index=False, if_exists="replace") pd.DataFrame([{"Mpcode": "MOCK001", "Maaiveld": 2.0}]).to_sql( "MpMv", diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 13c4b1c..255a14d 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -13,6 +13,43 @@ def test_remove_outliers_replaces_values_outside_threshold(): assert result.iloc[:3].notna().all() +def test_compute_residence_time_uses_datetime_elapsed_days_for_explicit_pore_volume(): + index = pd.date_range("2020-01-01", periods=5, freq="12h") + flow = pd.Series(12.0, index=index) + + result = dt.analysis.compute_residence_time(flow, pore_volume_reservoir=6.0) + + expected = pd.Series(0.5, index=index) + pd.testing.assert_series_equal(result, expected) + + +def test_compute_residence_time_can_derive_pore_volume_from_average_residence_time(): + index = pd.date_range("2020-01-01", periods=5, freq="D") + flow = pd.Series(8.0, index=index) + + result = dt.analysis.compute_residence_time(flow, average_residence_time=1.25) + + expected = pd.Series(1.25, index=index) + pd.testing.assert_series_equal(result, expected) + + +def test_compute_residence_time_supports_flow_direction_and_retardation(): + index = pd.date_range("2020-01-01", periods=5, freq="D") + flow = pd.Series(10.0, index=index) + + extraction = dt.analysis.compute_residence_time(flow, pore_volume_reservoir=20.0) + infiltration = dt.analysis.compute_residence_time( + flow, + pore_volume_reservoir=20.0, + extraction_infiltration="infiltration", + ) + retarded = dt.analysis.compute_residence_time(flow, pore_volume_reservoir=20.0, retardation_factor=1.5) + + pd.testing.assert_series_equal(extraction, pd.Series(2.0, index=index)) + pd.testing.assert_series_equal(infiltration, pd.Series(2.0, index=index)) + pd.testing.assert_series_equal(retarded, pd.Series(3.0, index=index)) + + def test_potential_to_flow_uses_filter_metadata_and_static_heads(): result = dt.potential_to_flow( mpcode1="MOCK001", @@ -32,6 +69,63 @@ def test_potential_to_flow_uses_filter_metadata_and_static_heads(): pd.testing.assert_series_equal(result["duration"], pd.Series([1.25]), check_names=False) +def test_potential_to_flow_resamples_dynamic_head_series(): + result = dt.potential_to_flow( + mpcode1="MOCK001", + filternr1=1, + mpcode2="MOCK002", + filternr2=1, + hydraulic_conductivity=10.0, + porosity=0.25, + ) + + index = pd.date_range("2020-01-01", "2020-01-06", freq="D") + expected_gradient = pd.Series([0.05, 0.05, np.nan, np.nan, np.nan, 0.01], index=index) + expected_duration = pd.Series([2.5, 2.5, np.nan, np.nan, np.nan, 12.5], index=index) + + pd.testing.assert_series_equal(result["gradient"], expected_gradient, check_freq=False, check_names=False) + pd.testing.assert_series_equal(result["duration"], expected_duration, check_freq=False, check_names=False) + + +def test_get_well_info_uses_mock_database_and_current_filter_columns(monkeypatch): + meteo_calls = [] + regis_calls = [] + + def fake_meteo_from_loc(*, x, y, mettype, start_date, end_date): + meteo_calls.append({ + "x": x, + "y": y, + "mettype": mettype, + "start_date": start_date, + "end_date": end_date, + }) + return pd.Series([1.0], index=pd.DatetimeIndex([start_date]), name=mettype), [("MOCK", 0.0, None)] + + regis = {"source": "synthetic-regis"} + + def fake_regis_ds(x, y, keys=None): + regis_calls.append((x, y, keys)) + return regis + + monkeypatch.setattr(dt.analysis, "get_daw_meteo_from_loc", fake_meteo_from_loc) + monkeypatch.setattr(dt.analysis, "get_regis_ds", fake_regis_ds) + + info = dt.get_well_info(mpcode="MOCK001", filternr=1) + + assert info["filter_metadata"].iloc[0]["MpCode"] == "MOCK001" + assert info["filter_metadata"].iloc[0]["Filtnr"] == 1 + assert info["gws"].loc["2020-01-01"] == 1.0 + assert info["gwt"].loc["2020-01-01"] == 8.0 + assert list(info["boring"].index.unique()) == ["MOCK001"] + assert list(info["triwaco"]["Mpcode"].unique()) == ["MOCK001"] + assert info["regis"] is regis + assert regis_calls == [(100000.0, 500000.0, None)] + assert {call["mettype"] for call in meteo_calls} == set(dt.analysis.meteo_pars) + assert all(call["x"] == 100000.0 and call["y"] == 500000.0 for call in meteo_calls) + assert all(call["start_date"] == pd.Timestamp("2020-01-01") for call in meteo_calls) + assert all(call["end_date"] == pd.Timestamp("2020-01-06") for call in meteo_calls) + + def test_get_cluster_mps_finds_nearby_monitoring_points(): mps = dt.get_daw_mps() diff --git a/tests/test_io.py b/tests/test_io.py index e866408..d3332bf 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,5 +1,6 @@ import numpy as np import pandas as pd +import pytest import dawacotools as dt from dawacotools import io as dawaco_io @@ -44,6 +45,12 @@ def test_get_daw_mps_supports_exact_and_partial_matching(): assert list(partial_list.index) == ["MOCK001", "MOCK010"] +def test_get_daw_mps_binds_mpcode_values_safely(): + injected = dt.get_daw_mps(mpcode="MOCK001' OR '1'='1", partial_match_mpcode=False) + + assert injected.empty + + def test_get_daw_filters_excludes_expired_filters_by_default(): filters = dt.get_daw_filters() @@ -70,6 +77,16 @@ def test_get_daw_filters_supports_filter_and_expired_selection(): assert with_expired.iloc[0]["Verval_datum"] == pd.Timestamp("2020-01-01") +def test_get_daw_filters_validates_scalar_and_list_filternr_consistently(): + assert dt.get_daw_filters(mpcode="MOCK001", filternr=0).empty + assert dt.get_daw_filters(mpcode="MOCK001", filternr=[0]).empty + + with pytest.raises(ValueError, match="filternr must be a non-negative integer-like value"): + dt.get_daw_filters(mpcode="MOCK001", filternr=-1) + with pytest.raises(ValueError, match="filternr must be a non-negative integer-like value"): + dt.get_daw_filters(mpcode="MOCK001", filternr=[1, -1]) + + def test_get_daw_filters_can_return_hydropandas_metadata_shape(): filters = dt.get_daw_filters(mpcode="MOCK001", filternr=1, return_hpd=True) @@ -92,10 +109,7 @@ def test_get_daw_ts_stijghgt_masks_sentinel_values_and_inserts_expected_gaps(): assert list(series.index) == list(pd.date_range("2020-01-01", "2020-01-06", freq="D")) assert series.name == "mNAP" - assert series.loc["2020-01-01"] == 1.0 - assert np.isnan(series.loc["2020-01-03"]) - assert np.isnan(series.loc["2020-01-04"]) - assert np.isnan(series.loc["2020-01-05"]) + np.testing.assert_allclose(series.to_numpy(), [1.0, 1.1, np.nan, np.nan, np.nan, 1.4], equal_nan=True) def test_get_daw_ts_temp_masks_zero_and_sentinel_values(): @@ -103,10 +117,37 @@ def test_get_daw_ts_temp_masks_zero_and_sentinel_values(): assert list(series.index) == list(pd.date_range("2020-01-01", "2020-01-06", freq="D")) assert series.name == "MOCK001_1" - assert series.loc["2020-01-01"] == 8.0 - assert np.isnan(series.loc["2020-01-02"]) - assert np.isnan(series.loc["2020-01-05"]) - assert series.loc["2020-01-06"] == 9.0 + np.testing.assert_allclose(series.to_numpy(), [8.0, np.nan, np.nan, np.nan, np.nan, 9.0], equal_nan=True) + + +def test_get_daw_ts_stijghgt_binds_public_inputs_safely(): + with pytest.raises(ValueError, match="not in Dawaco"): + dt.get_daw_ts_stijghgt(mpcode="MOCK001' OR '1'='1", filternr=1) + with pytest.raises(ValueError, match="filternr must be a non-negative integer-like value"): + dt.get_daw_ts_stijghgt(mpcode="MOCK001", filternr="1 OR 1=1") + + +def test_identify_data_gaps_preserves_order_when_filling_multiple_gaps(): + index = pd.to_datetime([ + "2020-01-01", + "2020-01-02", + "2020-01-05", + "2020-01-06", + "2020-01-07", + "2020-01-10", + "2020-01-11", + ]) + series = pd.Series(np.arange(len(index), dtype=float), index=index, name="synthetic") + + filled = dawaco_io.identify_data_gaps(series) + + assert filled.index.is_monotonic_increasing + assert list(filled.index) == list(pd.date_range("2020-01-01", "2020-01-11", freq="D")) + assert np.isnan(filled.loc["2020-01-03"]) + assert np.isnan(filled.loc["2020-01-04"]) + assert np.isnan(filled.loc["2020-01-08"]) + assert np.isnan(filled.loc["2020-01-09"]) + assert filled.loc["2020-01-10"] == 5.0 def test_get_daw_ts_meteo_unpivots_month_columns_and_drops_missing_values(): @@ -114,7 +155,27 @@ def test_get_daw_ts_meteo_unpivots_month_columns_and_drops_missing_values(): assert series.name == "Station 235W - Neerslag" assert list(series.index) == [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-03")] - assert list(series.values) == [1.0, 3.0] + np.testing.assert_allclose(series.to_numpy(), [1.0, 3.0], equal_nan=True) + + +def test_get_daw_meteo_from_loc_raises_clear_error_without_covering_station(): + with pytest.raises(ValueError, match="No meteo station covers Neerslag"): + dt.get_daw_meteo_from_loc( + x=100000, + y=500000, + mettype="Neerslag", + start_date="1800-01-01", + end_date="1800-01-31", + ) + + +def test_get_daw_meteo_arr_daterange_returns_reconstructed_ranges(): + dateranges = dawaco_io.get_daw_meteo_arr_daterange() + + assert list(dateranges.columns) == dawaco_io.meteo_header + station = dateranges.set_index("statcode").loc["235W"] + assert station["N_start"] == pd.Timestamp("2020-01-01") + assert station["N_end"] == pd.Timestamp("2020-01-03") def test_get_daw_boring_and_triwaco_use_mock_database(): diff --git a/tests/test_io_plenty.py b/tests/test_io_plenty.py new file mode 100644 index 0000000..de7dfd4 --- /dev/null +++ b/tests/test_io_plenty.py @@ -0,0 +1,83 @@ +import numpy as np +import pandas as pd +import pytest + +from dawacotools import io_plenty + + +@pytest.mark.parametrize( + ("tag", "start", "end"), + [ + ("CAWP02", "2016-01-01", "2019-01-01"), + ("CAWP05", "2016-02-01", "2018-01-01"), + ("CAWQ01", "2012-01-01", "2013-01-01"), + ("HEW906", "2016-01-01", "2018-01-01"), + ("HEWPAF", "2012-01-01", "2013-01-01"), + ("BEWARU", "2004-01-01", "2005-01-01"), + ("BEWBRU", "2016-01-01", "2018-01-01"), + ], +) +def test_sec_pa_flow_date_masks_use_totalizer_only_inside_affected_interval(tag, start, end): + start = pd.Timestamp(start) + end = pd.Timestamp(end) + index = pd.DatetimeIndex([start - pd.Timedelta(days=1), start, end]) + data = { + f"{tag}_FQ10R": 1.0, + f"{tag}_FT10": 100.0, + } + if tag in {"CAWP02", "CAWP05", "CAWQ01", "HEW906"}: + data[f"{tag}_FQ11R"] = 10.0 + expected = np.array([-44.0, -140.0, -44.0]) + else: + expected = np.array([-4.0, -100.0, -4.0]) + df_plenty = pd.DataFrame(data, index=index) + + actual = df_plenty.eval(io_plenty.secs_pa_flow[tag]).to_numpy() + + np.testing.assert_allclose(actual, expected) + + +@pytest.mark.parametrize("flow_func", [io_plenty.get_flow, io_plenty.get_flows]) +def test_get_flow_returns_1d_array_for_all_pump_metadata(flow_func): + metadata = pd.DataFrame( + { + "MpCode": ["19CNPCP 1", "19CNPCP 3"], + "Filtnr": [0, 0], + }, + ) + df_plenty = pd.DataFrame( + { + "CAWP01_FQ10R": [1.0], + "CAWP01_FQ11R": [2.0], + "CAWP03_FQ10R": [3.0], + "CAWP03_FQ11R": [4.0], + }, + index=pd.DatetimeIndex(["2020-01-01"]), + ) + + flow = flow_func(metadata, df_plenty) + + assert flow.shape == (2,) + np.testing.assert_allclose(flow, [-12.0, -28.0]) + + +@pytest.mark.parametrize("flow_func", [io_plenty.get_flow, io_plenty.get_flows]) +def test_get_flow_returns_1d_array_for_mixed_pump_and_non_pump_metadata(flow_func): + metadata = pd.DataFrame( + { + "MpCode": ["19CNPCP 1", "OBS001"], + "Filtnr": [0, 1], + }, + ) + df_plenty = pd.DataFrame( + { + "CAWP01_FQ10R": [1.0], + "CAWP01_FQ11R": [2.0], + }, + index=pd.DatetimeIndex(["2020-01-01"]), + ) + + flow = flow_func(metadata, df_plenty) + + assert flow.shape == (2,) + np.testing.assert_allclose(flow, [-12.0, np.nan], equal_nan=True) diff --git a/tests/test_plot.py b/tests/test_plot.py new file mode 100644 index 0000000..f0501fd --- /dev/null +++ b/tests/test_plot.py @@ -0,0 +1,111 @@ +import contextily as ctx +import matplotlib as mpl + +mpl.use("Agg") + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import xarray as xr + +import dawacotools as dt +from dawacotools import plot as dawaco_plot + + +def test_plot_daw_mp_uses_filter_mpcode_column_and_triwaco_tuple_zlim(monkeypatch): + def fake_get_daw_boring(*_args, **_kwargs): + return pd.DataFrame() + + def fake_plot_daw_mp_map(**kwargs): + return kwargs["ax"] + + def fake_plot_knmi_meteo(*_args, **_kwargs): + return None + + def fake_get_daw_mon_dates(*_args, **_kwargs): + return [] + + monkeypatch.setattr(dawaco_plot, "get_daw_boring", fake_get_daw_boring) + monkeypatch.setattr(dawaco_plot, "plot_daw_mp_map", fake_plot_daw_mp_map) + monkeypatch.setattr(dawaco_plot, "plot_knmi_meteo", fake_plot_knmi_meteo) + monkeypatch.setattr(dawaco_plot, "get_daw_mon_dates", fake_get_daw_mon_dates) + gws_index = pd.date_range("2020-01-01", periods=2, freq="D") + + def fake_get_daw_ts_stijghgt(mpcode, filternr): + assert isinstance(mpcode, str) + assert filternr in {1, 2} + return pd.Series([1.0, 1.1], index=gws_index, name="mNAP") + + monkeypatch.setattr(dawaco_plot, "get_daw_ts_stijghgt", fake_get_daw_ts_stijghgt) + + fig = dawaco_plot.plot_daw_mp("MOCK001", map_type="satelite") + + assert fig.axes + plt.close(fig) + + +def test_plot_daw_map_gws_keys_medians_by_filter_row(monkeypatch): + filters = dt.get_daw_filters().reset_index(drop=True) + median_by_filter = { + ("MOCK001", 1): 1.0, + ("MOCK001", 2): 2.0, + ("MOCK002", 1): 3.0, + } + + def fake_get_daw_ts_stijghgt(mpcode, filternr): + value = median_by_filter[(mpcode, filternr)] + index = pd.date_range("2017-01-01", periods=12, freq="D") + return pd.Series(np.full(index.size, value), index=index, name="mNAP") + + monkeypatch.setattr(dawaco_plot, "get_daw_ts_stijghgt", fake_get_daw_ts_stijghgt) + fig, ax = plt.subplots() + + filtmeds, plot_ax = dawaco_plot.plot_daw_map_gws(filters, ax=ax) + + assert plot_ax is ax + assert list(filtmeds[["MpCode", "Filtnr"]].itertuples(index=False, name=None)) == list(median_by_filter.keys()) + np.testing.assert_allclose(filtmeds["gwsmed"], list(median_by_filter.values())) + plt.close(fig) + + +def test_plot_daw_mp_map_supports_point_of_interest_marker(monkeypatch): + def fake_add_basemap(*_args, **_kwargs): + return None + + monkeypatch.setattr(ctx, "add_basemap", fake_add_basemap) + mps = dt.get_daw_mps() + mps.loc["MOCK001", "Soort"] = "Point of interest" + fig, ax = plt.subplots() + ax.set_xlim(99990.0, 100010.0) + ax.set_ylim(499990.0, 500010.0) + + returned_ax = dawaco_plot.plot_daw_mp_map(mpcode="MOCK001", mps=mps, ax=ax, map_type="satelite") + + assert returned_ax is ax + plt.close(fig) + + +def test_plot_nlmod_k_labels_horizontal_conductivity_as_kh(monkeypatch): + model_ds = xr.Dataset( + data_vars={ + "idomain": (("layer", "icell2d"), np.array([[1], [1]])), + "kh": (("layer", "icell2d"), np.array([[10.0], [5.0]])), + "kv": (("layer", "icell2d"), np.array([[1.0], [0.5]])), + "top": ("icell2d", np.array([2.0])), + "botm": (("layer", "icell2d"), np.array([[0.0], [-5.0]])), + }, + coords={ + "layer": np.array([1, 2]), + "icell2d": np.array([0]), + "x": ("icell2d", np.array([100000.0])), + "y": ("icell2d", np.array([500000.0])), + }, + ) + monkeypatch.setattr(dawaco_plot.xr, "open_dataset", lambda _path: model_ds) + fig, ax = plt.subplots() + + dawaco_plot.plot_nlmod_k(100000.0, 500000.0, "model.nc", ax) + + assert ax.get_xlabel() == "Kh (m/dag; blauw)" + assert fig.axes[1].get_xlabel() == "Kv (m/dag; oranje)" + plt.close(fig)