@@ -207,12 +207,20 @@ func (this *Migrator) canStopStreaming() bool {
207207 return atomic .LoadInt64 (& this .migrationContext .CutOverCompleteFlag ) != 0
208208}
209209
210- // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
211- func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
210+ // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
211+ func (this * Migrator ) onChangelogEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
212212 // Hey, I created the changelog table, I know the type of columns it has!
213- if hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint != "state" {
213+ switch hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint {
214+ case "state" :
215+ return this .onChangelogStateEvent (dmlEvent )
216+ case "heartbeat" :
217+ return this .onChangelogHeartbeatEvent (dmlEvent )
218+ default :
214219 return nil
215220 }
221+ }
222+
223+ func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
216224 changelogStateString := dmlEvent .NewColumnValues .StringColumn (3 )
217225 changelogState := ReadChangelogState (changelogStateString )
218226 this .migrationContext .Log .Infof ("Intercepted changelog state %s" , changelogState )
@@ -245,6 +253,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
245253 return nil
246254}
247255
256+ func (this * Migrator ) onChangelogHeartbeatEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
257+ changelogHeartbeatString := dmlEvent .NewColumnValues .StringColumn (3 )
258+
259+ heartbeatTime , err := time .Parse (time .RFC3339Nano , changelogHeartbeatString )
260+ if err != nil {
261+ return this .migrationContext .Log .Errore (err )
262+ } else {
263+ this .migrationContext .SetLastHeartbeatOnChangelogTime (heartbeatTime )
264+ return nil
265+ }
266+ }
267+
248268// listenOnPanicAbort aborts on abort request
249269func (this * Migrator ) listenOnPanicAbort () {
250270 err := <- this .migrationContext .PanicAbort
@@ -476,6 +496,13 @@ func (this *Migrator) cutOver() (err error) {
476496 this .migrationContext .Log .Debugf ("checking for cut-over postpone" )
477497 this .sleepWhileTrue (
478498 func () (bool , error ) {
499+ heartbeatLag := this .migrationContext .TimeSinceLastHeartbeatOnChangelog ()
500+ maxLagMillisecondsThrottle := time .Duration (atomic .LoadInt64 (& this .migrationContext .MaxLagMillisecondsThrottleThreshold )) * time .Millisecond
501+ cutOverLockTimeout := time .Duration (this .migrationContext .CutOverLockTimeoutSeconds ) * time .Second
502+ if heartbeatLag > maxLagMillisecondsThrottle || heartbeatLag > cutOverLockTimeout {
503+ this .migrationContext .Log .Debugf ("current HeartbeatLag (%.2fs) is too high, it needs to be less than both --max-lag-millis (%.2fs) and --cut-over-lock-timeout-seconds (%.2fs) to continue" , heartbeatLag .Seconds (), maxLagMillisecondsThrottle .Seconds (), cutOverLockTimeout .Seconds ())
504+ return true , nil
505+ }
479506 if this .migrationContext .PostponeCutOverFlagFile == "" {
480507 return false , nil
481508 }
@@ -962,13 +989,14 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
962989
963990 currentBinlogCoordinates := * this .eventsStreamer .GetCurrentBinlogCoordinates ()
964991
965- status := fmt .Sprintf ("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s" ,
992+ status := fmt .Sprintf ("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s" ,
966993 totalRowsCopied , rowsEstimate , progressPct ,
967994 atomic .LoadInt64 (& this .migrationContext .TotalDMLEventsApplied ),
968995 len (this .applyEventsQueue ), cap (this .applyEventsQueue ),
969996 base .PrettifyDurationOutput (elapsedTime ), base .PrettifyDurationOutput (this .migrationContext .ElapsedRowCopyTime ()),
970997 currentBinlogCoordinates ,
971998 this .migrationContext .GetCurrentLagDuration ().Seconds (),
999+ this .migrationContext .TimeSinceLastHeartbeatOnChangelog ().Seconds (),
9721000 state ,
9731001 eta ,
9741002 )
@@ -995,7 +1023,7 @@ func (this *Migrator) initiateStreaming() error {
9951023 this .migrationContext .DatabaseName ,
9961024 this .migrationContext .GetChangelogTableName (),
9971025 func (dmlEvent * binlog.BinlogDMLEvent ) error {
998- return this .onChangelogStateEvent (dmlEvent )
1026+ return this .onChangelogEvent (dmlEvent )
9991027 },
10001028 )
10011029
0 commit comments