Skip to content

Commit fac1ba7

Browse files
committed
Throttler teardown
1 parent 7ef284a commit fac1ba7

1 file changed

Lines changed: 30 additions & 6 deletions

File tree

go/logic/throttler.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,18 @@ const frenoMagicHint = "freno"
4141
// Throttler collects metrics related to throttling and makes informed decision
4242
// whether throttling should take place.
4343
type Throttler struct {
44-
migrationContext *base.MigrationContext
45-
applier *Applier
46-
inspector *Inspector
44+
migrationContext *base.MigrationContext
45+
applier *Applier
46+
inspector *Inspector
47+
finishedMigrating int64
4748
}
4849

4950
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler {
5051
return &Throttler{
51-
migrationContext: migrationContext,
52-
applier: applier,
53-
inspector: inspector,
52+
migrationContext: migrationContext,
53+
applier: applier,
54+
inspector: inspector,
55+
finishedMigrating: 0,
5456
}
5557
}
5658

@@ -159,6 +161,9 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
159161

160162
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
161163
for range ticker {
164+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
165+
return
166+
}
162167
go collectFunc()
163168
}
164169
}
@@ -233,6 +238,9 @@ func (this *Throttler) collectControlReplicasLag() {
233238
shouldReadLagAggressively := false
234239

235240
for range aggressiveTicker {
241+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
242+
return
243+
}
236244
if counter%relaxedFactor == 0 {
237245
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
238246
// do not typically change at all throughout the migration, but nonetheless we check them.
@@ -285,6 +293,10 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
285293

286294
ticker := time.Tick(100 * time.Millisecond)
287295
for range ticker {
296+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
297+
return
298+
}
299+
288300
if sleep, _ := collectFunc(); sleep {
289301
time.Sleep(1 * time.Second)
290302
}
@@ -393,6 +405,10 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan
393405

394406
throttlerMetricsTick := time.Tick(1 * time.Second)
395407
for range throttlerMetricsTick {
408+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
409+
return
410+
}
411+
396412
this.collectGeneralThrottleMetrics()
397413
}
398414
}()
@@ -419,6 +435,9 @@ func (this *Throttler) initiateThrottlerChecks() error {
419435
}
420436
throttlerFunction()
421437
for range throttlerTick {
438+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
439+
return nil
440+
}
422441
throttlerFunction()
423442
}
424443

@@ -440,3 +459,8 @@ func (this *Throttler) throttle(onThrottled func()) {
440459
time.Sleep(250 * time.Millisecond)
441460
}
442461
}
462+
463+
func (this *Throttler) Teardown() {
464+
this.migrationContext.Log.Debugf("Tearing down...")
465+
atomic.StoreInt64(&this.finishedMigrating, 1)
466+
}

0 commit comments

Comments
 (0)