@@ -1025,14 +1025,21 @@ func (this *Migrator) iterateChunks() error {
10251025}
10261026
10271027func (this * Migrator ) onApplyEventStruct (eventStruct * applyEventStruct ) error {
1028- if eventStruct .writeFunc != nil {
1029- if err := this .retryOperation (* eventStruct .writeFunc ); err != nil {
1030- return log .Errore (err )
1028+ handleNonDMLEventStruct := func (eventStruct * applyEventStruct ) error {
1029+ if eventStruct .writeFunc != nil {
1030+ if err := this .retryOperation (* eventStruct .writeFunc ); err != nil {
1031+ return log .Errore (err )
1032+ }
10311033 }
1034+ return nil
1035+ }
1036+ if eventStruct .dmlEvent == nil {
1037+ return handleNonDMLEventStruct (eventStruct )
10321038 }
10331039 if eventStruct .dmlEvent != nil {
10341040 dmlEvents := [](* binlog.BinlogDMLEvent ){}
10351041 dmlEvents = append (dmlEvents , eventStruct .dmlEvent )
1042+ var nonDmlStructToApply * applyEventStruct
10361043
10371044 availableEvents := len (this .applyEventsQueue )
10381045 batchSize := int (atomic .LoadInt64 (& this .migrationContext .DMLBatchSize ))
@@ -1043,6 +1050,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
10431050 additionalStruct := <- this .applyEventsQueue
10441051 if additionalStruct .dmlEvent == nil {
10451052 // Not a DML. We don't group this, and we don't batch any further
1053+ nonDmlStructToApply = additionalStruct
10461054 break
10471055 }
10481056 dmlEvents = append (dmlEvents , additionalStruct .dmlEvent )
@@ -1054,6 +1062,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
10541062 if err := this .retryOperation (applyEventFunc ); err != nil {
10551063 return log .Errore (err )
10561064 }
1065+ if nonDmlStructToApply != nil {
1066+ // We pulled DML events from the queue, and then we hit a non-DML event. Wait!
1067+ // We need to handle it!
1068+ if err := handleNonDMLEventStruct (nonDmlStructToApply ); err != nil {
1069+ return log .Errore (err )
1070+ }
1071+ }
10571072 }
10581073 return nil
10591074}
0 commit comments