@@ -24,6 +24,28 @@ const (
2424 atomicCutOverMagicHint = "ghost-cut-over-sentry"
2525)
2626
27+ type dmlBuildResult struct {
28+ query string
29+ args []interface {}
30+ rowsDelta int64
31+ err error
32+ }
33+
34+ func newDmlBuildResult (query string , args []interface {}, rowsDelta int64 , err error ) * dmlBuildResult {
35+ return & dmlBuildResult {
36+ query : query ,
37+ args : args ,
38+ rowsDelta : rowsDelta ,
39+ err : err ,
40+ }
41+ }
42+
43+ func newDmlBuildResultError (err error ) * dmlBuildResult {
44+ return & dmlBuildResult {
45+ err : err ,
46+ }
47+ }
48+
2749// Applier connects and writes the the applier-server, which is the server where migration
2850// happens. This is typically the master, but could be a replica when `--test-on-replica` or
2951// `--execute-on-replica` are given.
@@ -904,79 +926,103 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
904926 return result , nil
905927}
906928
929+ // updateModifiesUniqueKeyColumns checks whether a UPDATE DML event actually
930+ // modifies values of the migration's unique key (the iterated key). This will call
931+ // for special handling.
932+ func (this * Applier ) updateModifiesUniqueKeyColumns (dmlEvent * binlog.BinlogDMLEvent ) (modifiedColumn string , isModified bool ) {
933+ for _ , column := range this .migrationContext .UniqueKey .Columns .Columns () {
934+ tableOrdinal := this .migrationContext .OriginalTableColumns .Ordinals [column .Name ]
935+ whereColumnValue := dmlEvent .WhereColumnValues .AbstractValues ()[tableOrdinal ]
936+ newColumnValue := dmlEvent .NewColumnValues .AbstractValues ()[tableOrdinal ]
937+ if newColumnValue != whereColumnValue {
938+ return column .Name , true
939+ }
940+ }
941+ return "" , false
942+ }
943+
907944// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
908945// event entry on the original table.
909- func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) (query string , args [] interface {}, rowsDelta int64 , err error ) {
946+ func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) (results []( * dmlBuildResult ) ) {
910947 switch dmlEvent .DML {
911948 case binlog .DeleteDML :
912949 {
913950 query , uniqueKeyArgs , err := sql .BuildDMLDeleteQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , & this .migrationContext .UniqueKey .Columns , dmlEvent .WhereColumnValues .AbstractValues ())
914- return query , uniqueKeyArgs , - 1 , err
951+ return append ( results , newDmlBuildResult ( query , uniqueKeyArgs , - 1 , err ))
915952 }
916953 case binlog .InsertDML :
917954 {
918955 query , sharedArgs , err := sql .BuildDMLInsertQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , this .migrationContext .SharedColumns , this .migrationContext .MappedSharedColumns , dmlEvent .NewColumnValues .AbstractValues ())
919- return query , sharedArgs , 1 , err
956+ return append ( results , newDmlBuildResult ( query , sharedArgs , 1 , err ))
920957 }
921958 case binlog .UpdateDML :
922959 {
960+ if _ , isModified := this .updateModifiesUniqueKeyColumns (dmlEvent ); isModified {
961+ dmlEvent .DML = binlog .DeleteDML
962+ results = append (results , this .buildDMLEventQuery (dmlEvent )... )
963+ dmlEvent .DML = binlog .InsertDML
964+ results = append (results , this .buildDMLEventQuery (dmlEvent )... )
965+ return results
966+ }
923967 query , sharedArgs , uniqueKeyArgs , err := sql .BuildDMLUpdateQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , this .migrationContext .SharedColumns , this .migrationContext .MappedSharedColumns , & this .migrationContext .UniqueKey .Columns , dmlEvent .NewColumnValues .AbstractValues (), dmlEvent .WhereColumnValues .AbstractValues ())
968+ args := sqlutils .Args ()
924969 args = append (args , sharedArgs ... )
925970 args = append (args , uniqueKeyArgs ... )
926- return query , args , 0 , err
971+ return append ( results , newDmlBuildResult ( query , args , 0 , err ))
927972 }
928973 }
929- return "" , args , 0 , fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML )
974+ return append ( results , newDmlBuildResultError ( fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML )) )
930975}
931976
932977// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
933978// original-table binlog event
934979func (this * Applier ) ApplyDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) error {
935- query , args , rowDelta , err := this .buildDMLEventQuery (dmlEvent )
936- if err != nil {
937- return err
938- }
939- // TODO The below is in preparation for transactional writes on the ghost tables.
940- // Such writes would be, for example:
941- // - prepended with sql_mode setup
942- // - prepended with time zone setup
943- // - prepended with SET SQL_LOG_BIN=0
944- // - prepended with SET FK_CHECKS=0
945- // etc.
946- //
947- // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
948- // is solved by silently converting unsigned bigints to string values.
949- //
950-
951- err = func () error {
952- tx , err := this .db .Begin ()
953- if err != nil {
954- return err
980+ for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
981+ if buildResult .err != nil {
982+ return buildResult .err
955983 }
956- sessionQuery := `SET
984+ // TODO The below is in preparation for transactional writes on the ghost tables.
985+ // Such writes would be, for example:
986+ // - prepended with sql_mode setup
987+ // - prepended with time zone setup
988+ // - prepended with SET SQL_LOG_BIN=0
989+ // - prepended with SET FK_CHECKS=0
990+ // etc.
991+ //
992+ // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
993+ // is solved by silently converting unsigned bigints to string values.
994+ //
995+
996+ err := func () error {
997+ tx , err := this .db .Begin ()
998+ if err != nil {
999+ return err
1000+ }
1001+ sessionQuery := `SET
9571002 SESSION time_zone = '+00:00',
9581003 sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
9591004 `
960- if _ , err := tx .Exec (sessionQuery ); err != nil {
961- return err
962- }
963- if _ , err := tx .Exec (query , args ... ); err != nil {
964- return err
1005+ if _ , err := tx .Exec (sessionQuery ); err != nil {
1006+ return err
1007+ }
1008+ if _ , err := tx .Exec (buildResult .query , buildResult .args ... ); err != nil {
1009+ return err
1010+ }
1011+ if err := tx .Commit (); err != nil {
1012+ return err
1013+ }
1014+ return nil
1015+ }()
1016+
1017+ if err != nil {
1018+ err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), buildResult .query , buildResult .args )
1019+ return log .Errore (err )
9651020 }
966- if err := tx .Commit (); err != nil {
967- return err
1021+ // no error
1022+ atomic .AddInt64 (& this .migrationContext .TotalDMLEventsApplied , 1 )
1023+ if this .migrationContext .CountTableRows {
1024+ atomic .AddInt64 (& this .migrationContext .RowsDeltaEstimate , buildResult .rowsDelta )
9681025 }
969- return nil
970- }()
971-
972- if err != nil {
973- err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), query , args )
974- return log .Errore (err )
975- }
976- // no error
977- atomic .AddInt64 (& this .migrationContext .TotalDMLEventsApplied , 1 )
978- if this .migrationContext .CountTableRows {
979- atomic .AddInt64 (& this .migrationContext .RowsDeltaEstimate , rowDelta )
9801026 }
9811027 return nil
9821028}
@@ -1005,15 +1051,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
10051051 return rollback (err )
10061052 }
10071053 for _ , dmlEvent := range dmlEvents {
1008- query , args , rowDelta , err := this .buildDMLEventQuery (dmlEvent )
1009- if err != nil {
1010- return rollback (err )
1011- }
1012- if _ , err := tx .Exec (query , args ... ); err != nil {
1013- err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), query , args )
1014- return rollback (err )
1054+ for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
1055+ if buildResult .err != nil {
1056+ return rollback (buildResult .err )
1057+ }
1058+ if _ , err := tx .Exec (buildResult .query , buildResult .args ... ); err != nil {
1059+ err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), buildResult .query , buildResult .args )
1060+ return rollback (err )
1061+ }
1062+ totalDelta += buildResult .rowsDelta
10151063 }
1016- totalDelta += rowDelta
10171064 }
10181065 if err := tx .Commit (); err != nil {
10191066 return err
0 commit comments