@@ -85,32 +85,37 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
8585}
8686
8787// collectReplicationLag reads the latest changelog heartbeat value
88- func (this * Throttler ) collectReplicationLag () {
89- ticker := time .Tick (time .Duration (this .migrationContext .HeartbeatIntervalMilliseconds ) * time .Millisecond )
90- for range ticker {
91- go func () error {
92- if atomic .LoadInt64 (& this .migrationContext .CleanupImminentFlag ) > 0 {
93- return nil
94- }
88+ func (this * Throttler ) collectReplicationLag (firstThrottlingCollected chan <- bool ) {
89+ collectFunc := func () error {
90+ if atomic .LoadInt64 (& this .migrationContext .CleanupImminentFlag ) > 0 {
91+ return nil
92+ }
9593
96- if this .migrationContext .TestOnReplica || this .migrationContext .MigrateOnReplica {
97- // when running on replica, the heartbeat injection is also done on the replica.
98- // This means we will always get a good heartbeat value.
99- // When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
100- if lag , err := mysql .GetReplicationLag (this .inspector .connectionConfig ); err != nil {
101- return log .Errore (err )
102- } else {
103- atomic .StoreInt64 (& this .migrationContext .CurrentLag , int64 (lag ))
104- }
94+ if this .migrationContext .TestOnReplica || this .migrationContext .MigrateOnReplica {
95+ // when running on replica, the heartbeat injection is also done on the replica.
96+ // This means we will always get a good heartbeat value.
97+ // When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
98+ if lag , err := mysql .GetReplicationLag (this .inspector .connectionConfig ); err != nil {
99+ return log .Errore (err )
105100 } else {
106- if heartbeatValue , err := this .inspector .readChangelogState ("heartbeat" ); err != nil {
107- return log .Errore (err )
108- } else {
109- this .parseChangelogHeartbeat (heartbeatValue )
110- }
101+ atomic .StoreInt64 (& this .migrationContext .CurrentLag , int64 (lag ))
111102 }
112- return nil
113- }()
103+ } else {
104+ if heartbeatValue , err := this .inspector .readChangelogState ("heartbeat" ); err != nil {
105+ return log .Errore (err )
106+ } else {
107+ this .parseChangelogHeartbeat (heartbeatValue )
108+ }
109+ }
110+ return nil
111+ }
112+
113+ collectFunc ()
114+ firstThrottlingCollected <- true
115+
116+ ticker := time .Tick (time .Duration (this .migrationContext .HeartbeatIntervalMilliseconds ) * time .Millisecond )
117+ for range ticker {
118+ go collectFunc ()
114119 }
115120}
116121
@@ -285,13 +290,14 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
285290// that may affect throttling. There are several components, all running independently,
286291// that collect such metrics.
287292func (this * Throttler ) initiateThrottlerCollection (firstThrottlingCollected chan <- bool ) {
288- go this .collectReplicationLag ()
293+ go this .collectReplicationLag (firstThrottlingCollected )
289294 go this .collectControlReplicasLag ()
290295
291296 go func () {
292- throttlerMetricsTick := time .Tick (1 * time .Second )
293297 this .collectGeneralThrottleMetrics ()
294298 firstThrottlingCollected <- true
299+
300+ throttlerMetricsTick := time .Tick (1 * time .Second )
295301 for range throttlerMetricsTick {
296302 this .collectGeneralThrottleMetrics ()
297303 }
0 commit comments