@@ -1024,6 +1024,40 @@ func (this *Migrator) iterateChunks() error {
10241024 return nil
10251025}
10261026
1027+ func (this * Migrator ) onApplyEventStruct (eventStruct * applyEventStruct ) error {
1028+ if eventStruct .writeFunc != nil {
1029+ if err := this .retryOperation (* eventStruct .writeFunc ); err != nil {
1030+ return log .Errore (err )
1031+ }
1032+ }
1033+ if eventStruct .dmlEvent != nil {
1034+ dmlEvents := [](* binlog.BinlogDMLEvent ){}
1035+ dmlEvents = append (dmlEvents , eventStruct .dmlEvent )
1036+
1037+ availableEvents := len (this .applyEventsQueue )
1038+ batchSize := int (atomic .LoadInt64 (& this .migrationContext .DMLBatchSize ))
1039+ if availableEvents > batchSize {
1040+ availableEvents = batchSize
1041+ }
1042+ for i := 0 ; i < availableEvents ; i ++ {
1043+ additionalStruct := <- this .applyEventsQueue
1044+ if additionalStruct .dmlEvent == nil {
1045+ // Not a DML. We don't group this, and we don't batch any further
1046+ break
1047+ }
1048+ dmlEvents = append (dmlEvents , additionalStruct .dmlEvent )
1049+ }
1050+ // Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1051+ var applyEventFunc tableWriteFunc = func () error {
1052+ return this .applier .ApplyDMLEventQueries (dmlEvents )
1053+ }
1054+ if err := this .retryOperation (applyEventFunc ); err != nil {
1055+ return log .Errore (err )
1056+ }
1057+ }
1058+ return nil
1059+ }
1060+
10271061// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
10281062// This is where the ghost table gets the data. The function fills the data single-threaded.
10291063// Both event backlog and rowcopy events are polled; the backlog events have precedence.
@@ -1038,38 +1072,9 @@ func (this *Migrator) executeWriteFuncs() error {
10381072 // We give higher priority to event processing, then secondary priority to
10391073 // rowcopy
10401074 select {
1041- case applyEventStruct := <- this .applyEventsQueue :
1075+ case eventStruct := <- this .applyEventsQueue :
10421076 {
1043- if applyEventStruct .writeFunc != nil {
1044- if err := this .retryOperation (* applyEventStruct .writeFunc ); err != nil {
1045- return log .Errore (err )
1046- }
1047- }
1048- if applyEventStruct .dmlEvent != nil {
1049- dmlEvents := [](* binlog.BinlogDMLEvent ){}
1050- dmlEvents = append (dmlEvents , applyEventStruct .dmlEvent )
1051-
1052- availableEvents := len (this .applyEventsQueue )
1053- batchSize := int (atomic .LoadInt64 (& this .migrationContext .DMLBatchSize ))
1054- if availableEvents > batchSize {
1055- availableEvents = batchSize
1056- }
1057- for i := 0 ; i < availableEvents ; i ++ {
1058- additionalStruct := <- this .applyEventsQueue
1059- if additionalStruct .dmlEvent == nil {
1060- // Not a DML. We don't group this, and we don't batch any further
1061- break
1062- }
1063- dmlEvents = append (dmlEvents , additionalStruct .dmlEvent )
1064- }
1065- // Create a task to apply the DML event; this will be execute by executeWriteFuncs()
1066- var applyEventFunc tableWriteFunc = func () error {
1067- return this .applier .ApplyDMLEventQueries (dmlEvents )
1068- }
1069- if err := this .retryOperation (applyEventFunc ); err != nil {
1070- return log .Errore (err )
1071- }
1072- }
1077+ this .onApplyEventStruct (eventStruct )
10731078 }
10741079 default :
10751080 {
0 commit comments