@@ -84,18 +84,30 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
8484 }
8585}
8686
87- // collectHeartbeat reads the latest changelog heartbeat value
88- func (this * Throttler ) collectHeartbeat () {
87+ // collectReplicationLag reads the latest changelog heartbeat value
88+ func (this * Throttler ) collectReplicationLag () {
8989 ticker := time .Tick (time .Duration (this .migrationContext .HeartbeatIntervalMilliseconds ) * time .Millisecond )
9090 for range ticker {
9191 go func () error {
9292 if atomic .LoadInt64 (& this .migrationContext .CleanupImminentFlag ) > 0 {
9393 return nil
9494 }
95- if heartbeatValue , err := this .inspector .readChangelogState ("heartbeat" ); err != nil {
96- return log .Errore (err )
95+
96+ if this .migrationContext .TestOnReplica || this .migrationContext .MigrateOnReplica {
97+ // when running on replica, the heartbeat injection is also done on the replica.
98+ // This means we will always get a good heartbeat value.
99+ // When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
100+ if lag , err := mysql .GetReplicationLag (this .inspector .connectionConfig ); err != nil {
101+ return log .Errore (err )
102+ } else {
103+ atomic .StoreInt64 (& this .migrationContext .CurrentLag , int64 (lag ))
104+ }
97105 } else {
98- this .parseChangelogHeartbeat (heartbeatValue )
106+ if heartbeatValue , err := this .inspector .readChangelogState ("heartbeat" ); err != nil {
107+ return log .Errore (err )
108+ } else {
109+ this .parseChangelogHeartbeat (heartbeatValue )
110+ }
99111 }
100112 return nil
101113 }()
@@ -114,6 +126,7 @@ func (this *Throttler) collectControlReplicasLag() {
114126
115127 readReplicaLag := func (connectionConfig * mysql.ConnectionConfig ) (lag time.Duration , err error ) {
116128 dbUri := connectionConfig .GetDBUri ("information_schema" )
129+
117130 var heartbeatValue string
118131 if db , _ , err := sqlutils .GetDB (dbUri ); err != nil {
119132 return lag , err
@@ -272,7 +285,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
272285// that may affect throttling. There are several components, all running independently,
273286// that collect such metrics.
274287func (this * Throttler ) initiateThrottlerCollection (firstThrottlingCollected chan <- bool ) {
275- go this .collectHeartbeat ()
288+ go this .collectReplicationLag ()
276289 go this .collectControlReplicasLag ()
277290
278291 go func () {
0 commit comments