Skip to content

Commit f5dd7fd

Browse files
author
Shlomi Noach
authored
Merge pull request #365 from github/replication-lag-check-test-on-replica
testing/running on replica: gets lags via SHOW SLAVE STATUS
2 parents 8914bc2 + 481b0ce commit f5dd7fd

1 file changed

Lines changed: 19 additions & 6 deletions

File tree

go/logic/throttler.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,30 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
8484
}
8585
}
8686

87-
// collectHeartbeat reads the latest changelog heartbeat value
88-
func (this *Throttler) collectHeartbeat() {
87+
// collectReplicationLag reads the latest changelog heartbeat value
88+
func (this *Throttler) collectReplicationLag() {
8989
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
9090
for range ticker {
9191
go func() error {
9292
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
9393
return nil
9494
}
95-
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
96-
return log.Errore(err)
95+
96+
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
97+
// when running on replica, the heartbeat injection is also done on the replica.
98+
// This means we will always get a good heartbeat value.
99+
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
100+
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
101+
return log.Errore(err)
102+
} else {
103+
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
104+
}
97105
} else {
98-
this.parseChangelogHeartbeat(heartbeatValue)
106+
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
107+
return log.Errore(err)
108+
} else {
109+
this.parseChangelogHeartbeat(heartbeatValue)
110+
}
99111
}
100112
return nil
101113
}()
@@ -114,6 +126,7 @@ func (this *Throttler) collectControlReplicasLag() {
114126

115127
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
116128
dbUri := connectionConfig.GetDBUri("information_schema")
129+
117130
var heartbeatValue string
118131
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
119132
return lag, err
@@ -272,7 +285,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
272285
// that may affect throttling. There are several components, all running independently,
273286
// that collect such metrics.
274287
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
275-
go this.collectHeartbeat()
288+
go this.collectReplicationLag()
276289
go this.collectControlReplicasLag()
277290

278291
go func() {

0 commit comments

Comments
 (0)