@@ -207,12 +207,18 @@ func (this *Migrator) canStopStreaming() bool {
207207 return atomic .LoadInt64 (& this .migrationContext .CutOverCompleteFlag ) != 0
208208}
209209
210- // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
211- func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
210+ // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
211+ func (this * Migrator ) onChangelogEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
212212 // Hey, I created the changelog table, I know the type of columns it has!
213- if hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint != "state" {
213+ switch hint := dmlEvent .NewColumnValues .StringColumn (2 ); hint {
214+ case "state" :
215+ return this .onChangelogStateEvent (dmlEvent )
216+ default :
214217 return nil
215218 }
219+ }
220+
221+ func (this * Migrator ) onChangelogStateEvent (dmlEvent * binlog.BinlogDMLEvent ) (err error ) {
216222 changelogStateString := dmlEvent .NewColumnValues .StringColumn (3 )
217223 changelogState := ReadChangelogState (changelogStateString )
218224 this .migrationContext .Log .Infof ("Intercepted changelog state %s" , changelogState )
@@ -995,7 +1001,7 @@ func (this *Migrator) initiateStreaming() error {
9951001 this .migrationContext .DatabaseName ,
9961002 this .migrationContext .GetChangelogTableName (),
9971003 func (dmlEvent * binlog.BinlogDMLEvent ) error {
998- return this .onChangelogStateEvent (dmlEvent )
1004+ return this .onChangelogEvent (dmlEvent )
9991005 },
10001006 )
10011007
0 commit comments