Skip to content

Commit 5bee123

Browse files
Add monitoring for model manager (#37703)
* Add num of models reporting * Export memory estimate as well * Fix lint * switch to distribution * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Store the namespace instead of strings --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 30518c8 commit 5bee123

3 files changed

Lines changed: 114 additions & 9 deletions

File tree

sdks/python/apache_beam/metrics/metric.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ def counter(
9292
@staticmethod
9393
def distribution(
9494
namespace: Union[Type, str],
95-
name: str) -> 'Metrics.DelegatingDistribution':
95+
name: str,
96+
process_wide: bool = False) -> 'Metrics.DelegatingDistribution':
9697
"""Obtains or creates a Distribution metric.
9798
9899
Distribution metrics are restricted to integer-only distributions.
@@ -105,24 +106,30 @@ def distribution(
105106
A Distribution object.
106107
"""
107108
namespace = Metrics.get_namespace(namespace)
108-
return Metrics.DelegatingDistribution(MetricName(namespace, name))
109+
return Metrics.DelegatingDistribution(
110+
MetricName(namespace, name), process_wide=process_wide)
109111

110112
@staticmethod
111113
def gauge(
112-
namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingGauge':
114+
namespace: Union[Type, str],
115+
name: str,
116+
process_wide: bool = False) -> 'Metrics.DelegatingGauge':
113117
"""Obtains or creates a Gauge metric.
114118
115119
Gauge metrics are restricted to integer-only values.
116120
117121
Args:
118122
namespace: A class or string that gives the namespace to a metric
119123
name: A string that gives a unique name to a metric
124+
process_wide: Whether or not the metric is specific to the current bundle
125+
or should be calculated for the entire process.
120126
121127
Returns:
122-
A Distribution object.
128+
A Gauge object.
123129
"""
124130
namespace = Metrics.get_namespace(namespace)
125-
return Metrics.DelegatingGauge(MetricName(namespace, name))
131+
return Metrics.DelegatingGauge(
132+
MetricName(namespace, name), process_wide=process_wide)
126133

127134
@staticmethod
128135
def string_set(
@@ -210,15 +217,20 @@ def __init__(
210217

211218
class DelegatingDistribution(Distribution):
212219
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
213-
def __init__(self, metric_name: MetricName) -> None:
220+
def __init__(
221+
self, metric_name: MetricName, process_wide: bool = False) -> None:
214222
super().__init__(metric_name)
215-
self.update = MetricUpdater(cells.DistributionCell, metric_name) # type: ignore[method-assign]
223+
self.update = MetricUpdater(cells.DistributionCell, metric_name, process_wide=process_wide) # type: ignore[method-assign]
216224

217225
class DelegatingGauge(Gauge):
218226
"""Metrics Gauge that Delegates functionality to MetricsEnvironment."""
219-
def __init__(self, metric_name: MetricName) -> None:
227+
def __init__(
228+
self, metric_name: MetricName, process_wide: bool = False) -> None:
220229
super().__init__(metric_name)
221-
self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[method-assign]
230+
self.set = MetricUpdater( # type: ignore[method-assign]
231+
cells.GaugeCell,
232+
metric_name,
233+
process_wide=process_wide)
222234

223235
class DelegatingStringSet(StringSet):
224236
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""

sdks/python/apache_beam/ml/inference/model_manager.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@
4545
import torch
4646
from scipy.optimize import nnls
4747

48+
from apache_beam.metrics.metric import Metrics
4849
from apache_beam.utils import multi_process_shared
4950

5051
logger = logging.getLogger(__name__)
52+
_MODEL_MANAGER_METRICS_NAMESPACE = "BeamML_ModelManager"
5153

5254

5355
class GPUMonitor:
@@ -205,6 +207,10 @@ def set_initial_estimate(self, model_tag: str, cost: float):
205207
with self._lock:
206208
self.estimates[model_tag] = cost
207209
self.known_models.add(model_tag)
210+
Metrics.distribution(
211+
_MODEL_MANAGER_METRICS_NAMESPACE,
212+
f"memory_estimate_mb_{model_tag}",
213+
process_wide=True).update(int(cost))
208214
self.logging_info("Initial Profile for %s: %s MB", model_tag, cost)
209215

210216
def add_observation(
@@ -291,6 +297,11 @@ def _solve(self):
291297

292298
self.logging_info(
293299
"Updated Estimate for %s: %.1f MB", model, self.estimates[model])
300+
301+
Metrics.distribution(
302+
_MODEL_MANAGER_METRICS_NAMESPACE,
303+
f"memory_estimate_mb_{model}",
304+
process_wide=True).update(int(self.estimates[model]))
294305
self.logging_info("System Bias: %s MB", bias)
295306

296307
except Exception as e:
@@ -374,6 +385,20 @@ def __init__(
374385

375386
self._monitor.start()
376387

388+
def _update_model_count_metric(self):
389+
for tag, instances in self._models.items():
390+
Metrics.distribution(
391+
_MODEL_MANAGER_METRICS_NAMESPACE,
392+
f"num_loaded_models_{tag}",
393+
process_wide=True).update(len(instances))
394+
395+
def _clear_all_model_metrics(self):
396+
for tag in self._models:
397+
Metrics.distribution(
398+
_MODEL_MANAGER_METRICS_NAMESPACE,
399+
f"num_loaded_models_{tag}",
400+
process_wide=True).update(0)
401+
377402
def logging_info(self, message: str, *args):
378403
if self._verbose_logging:
379404
logger.info(message, *args)
@@ -719,6 +744,7 @@ def _perform_eviction(self, key: str, tag: str, instance: Any, score: int):
719744
self._monitor.reset_peak()
720745
curr, _, _ = self._monitor.get_stats()
721746
self.logging_info("Resource Usage After Eviction: %.1f MB", curr)
747+
self._update_model_count_metric()
722748

723749
def _spawn_new_model(
724750
self,
@@ -741,6 +767,7 @@ def _spawn_new_model(
741767
self._pending_reservations = max(
742768
0.0, self._pending_reservations - est_cost)
743769
self._models[tag].append(instance)
770+
self._update_model_count_metric()
744771
return instance
745772

746773
except Exception as e:
@@ -758,6 +785,7 @@ def _spawn_new_model(
758785
raise e
759786

760787
def _delete_all_models(self):
788+
self._clear_all_model_metrics()
761789
self._idle_lru.clear()
762790
for _, instances in self._models.items():
763791
for instance in instances:

sdks/python/apache_beam/ml/inference/model_manager_test.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from apache_beam.utils import multi_process_shared
2727

2828
try:
29+
from apache_beam.metrics.execution import MetricsEnvironment
30+
from apache_beam.metrics.metricbase import MetricName
2931
from apache_beam.ml.inference.model_manager import GPUMonitor
3032
from apache_beam.ml.inference.model_manager import ModelManager
3133
from apache_beam.ml.inference.model_manager import ResourceEstimator
@@ -335,6 +337,69 @@ def dummy_loader():
335337
instance = self.manager.acquire_model(model_name, lambda: "model_instance")
336338
self.manager.release_model(model_name, instance)
337339

340+
def test_model_manager_metrics(self):
341+
"""Test that distribution metrics are updated correctly."""
342+
tag1 = "model1"
343+
tag2 = "model2"
344+
345+
def _get_count_dist_max(tag):
346+
dist = MetricsEnvironment.process_wide_container().get_distribution(
347+
MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}'))
348+
return dist.get_cumulative().max
349+
350+
def _get_est_dist_mean(tag):
351+
dist = MetricsEnvironment.process_wide_container().get_distribution(
352+
MetricName('BeamML_ModelManager', f'memory_estimate_mb_{tag}'))
353+
val = dist.get_cumulative()
354+
return int(val.sum / val.count) if val.count > 0 else 0
355+
356+
# Verify that initial estimates correctly export int metrics
357+
self.manager._estimator.set_initial_estimate(tag1, 1000.5)
358+
self.assertEqual(_get_est_dist_mean(tag1), 1000)
359+
360+
self.manager._estimator.set_initial_estimate(tag2, 2000.9)
361+
self.assertEqual(_get_est_dist_mean(tag2), 2000)
362+
363+
# 1. Acquire a model
364+
self.manager.acquire_model(
365+
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
366+
self.assertEqual(_get_count_dist_max(tag1), 1)
367+
self.assertEqual(_get_est_dist_mean(tag1), 1000)
368+
369+
# 2. Acquire another instance of same model
370+
self.manager.acquire_model(
371+
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
372+
self.assertEqual(_get_count_dist_max(tag1), 2)
373+
self.assertEqual(_get_est_dist_mean(tag1), 1000)
374+
375+
# 3. Acquire a different model
376+
self.manager.acquire_model(
377+
tag2, lambda: MockModel(tag2, 2000.0, self.mock_monitor))
378+
self.assertEqual(_get_count_dist_max(tag2), 1)
379+
self.assertEqual(_get_est_dist_mean(tag2), 2000)
380+
381+
# tag1 max count should remain 2
382+
self.assertEqual(_get_count_dist_max(tag1), 2)
383+
self.assertEqual(_get_est_dist_mean(tag1), 1000)
384+
385+
# 4. Delete all models
386+
self.manager._delete_all_models()
387+
# It retains the highest count it ever saw.
388+
self.assertEqual(_get_count_dist_max(tag1), 2)
389+
self.assertEqual(_get_count_dist_max(tag2), 1)
390+
391+
self.assertEqual(_get_est_dist_mean(tag1), 1000)
392+
self.assertEqual(_get_est_dist_mean(tag2), 2000)
393+
394+
# 5. Repopulate and force reset
395+
self.manager.acquire_model(
396+
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
397+
# Max is still 2 from earlier in the test run
398+
self.assertEqual(_get_count_dist_max(tag1), 2)
399+
400+
self.manager._force_reset()
401+
self.assertEqual(_get_count_dist_max(tag1), 2)
402+
338403
def test_single_model_convergence_with_fluctuations(self):
339404
"""
340405
Tests that the estimator converges to the true usage with fluctuations.

0 commit comments

Comments
 (0)