@@ -24,6 +24,28 @@ const (
2424 atomicCutOverMagicHint = "ghost-cut-over-sentry"
2525)
2626
27+ type dmlBuildResult struct {
28+ query string
29+ args []interface {}
30+ rowsDelta int64
31+ err error
32+ }
33+
34+ func newDmlBuildResult (query string , args []interface {}, rowsDelta int64 , err error ) * dmlBuildResult {
35+ return & dmlBuildResult {
36+ query : query ,
37+ args : args ,
38+ rowsDelta : rowsDelta ,
39+ err : err ,
40+ }
41+ }
42+
43+ func newDmlBuildResultError (err error ) * dmlBuildResult {
44+ return & dmlBuildResult {
45+ err : err ,
46+ }
47+ }
48+
2749// Applier connects and writes the the applier-server, which is the server where migration
2850// happens. This is typically the master, but could be a replica when `--test-on-replica` or
2951// `--execute-on-replica` are given.
@@ -899,79 +921,103 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
899921 return result , nil
900922}
901923
924+ // updateModifiesUniqueKeyColumns checks whether a UPDATE DML event actually
925+ // modifies values of the migration's unique key (the iterated key). This will call
926+ // for special handling.
927+ func (this * Applier ) updateModifiesUniqueKeyColumns (dmlEvent * binlog.BinlogDMLEvent ) (modifiedColumn string , isModified bool ) {
928+ for _ , column := range this .migrationContext .UniqueKey .Columns .Columns () {
929+ tableOrdinal := this .migrationContext .OriginalTableColumns .Ordinals [column .Name ]
930+ whereColumnValue := dmlEvent .WhereColumnValues .AbstractValues ()[tableOrdinal ]
931+ newColumnValue := dmlEvent .NewColumnValues .AbstractValues ()[tableOrdinal ]
932+ if newColumnValue != whereColumnValue {
933+ return column .Name , true
934+ }
935+ }
936+ return "" , false
937+ }
938+
902939// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
903940// event entry on the original table.
904- func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) (query string , args [] interface {}, rowsDelta int64 , err error ) {
941+ func (this * Applier ) buildDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) (results []( * dmlBuildResult ) ) {
905942 switch dmlEvent .DML {
906943 case binlog .DeleteDML :
907944 {
908945 query , uniqueKeyArgs , err := sql .BuildDMLDeleteQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , & this .migrationContext .UniqueKey .Columns , dmlEvent .WhereColumnValues .AbstractValues ())
909- return query , uniqueKeyArgs , - 1 , err
946+ return append ( results , newDmlBuildResult ( query , uniqueKeyArgs , - 1 , err ))
910947 }
911948 case binlog .InsertDML :
912949 {
913950 query , sharedArgs , err := sql .BuildDMLInsertQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , this .migrationContext .SharedColumns , this .migrationContext .MappedSharedColumns , dmlEvent .NewColumnValues .AbstractValues ())
914- return query , sharedArgs , 1 , err
951+ return append ( results , newDmlBuildResult ( query , sharedArgs , 1 , err ))
915952 }
916953 case binlog .UpdateDML :
917954 {
955+ if _ , isModified := this .updateModifiesUniqueKeyColumns (dmlEvent ); isModified {
956+ dmlEvent .DML = binlog .DeleteDML
957+ results = append (results , this .buildDMLEventQuery (dmlEvent )... )
958+ dmlEvent .DML = binlog .InsertDML
959+ results = append (results , this .buildDMLEventQuery (dmlEvent )... )
960+ return results
961+ }
918962 query , sharedArgs , uniqueKeyArgs , err := sql .BuildDMLUpdateQuery (dmlEvent .DatabaseName , this .migrationContext .GetGhostTableName (), this .migrationContext .OriginalTableColumns , this .migrationContext .SharedColumns , this .migrationContext .MappedSharedColumns , & this .migrationContext .UniqueKey .Columns , dmlEvent .NewColumnValues .AbstractValues (), dmlEvent .WhereColumnValues .AbstractValues ())
963+ args := sqlutils .Args ()
919964 args = append (args , sharedArgs ... )
920965 args = append (args , uniqueKeyArgs ... )
921- return query , args , 0 , err
966+ return append ( results , newDmlBuildResult ( query , args , 0 , err ))
922967 }
923968 }
924- return "" , args , 0 , fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML )
969+ return append ( results , newDmlBuildResultError ( fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML )) )
925970}
926971
927972// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
928973// original-table binlog event
929974func (this * Applier ) ApplyDMLEventQuery (dmlEvent * binlog.BinlogDMLEvent ) error {
930- query , args , rowDelta , err := this .buildDMLEventQuery (dmlEvent )
931- if err != nil {
932- return err
933- }
934- // TODO The below is in preparation for transactional writes on the ghost tables.
935- // Such writes would be, for example:
936- // - prepended with sql_mode setup
937- // - prepended with time zone setup
938- // - prepended with SET SQL_LOG_BIN=0
939- // - prepended with SET FK_CHECKS=0
940- // etc.
941- //
942- // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
943- // is solved by silently converting unsigned bigints to string values.
944- //
945-
946- err = func () error {
947- tx , err := this .db .Begin ()
948- if err != nil {
949- return err
975+ for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
976+ if buildResult .err != nil {
977+ return buildResult .err
950978 }
951- sessionQuery := `SET
979+ // TODO The below is in preparation for transactional writes on the ghost tables.
980+ // Such writes would be, for example:
981+ // - prepended with sql_mode setup
982+ // - prepended with time zone setup
983+ // - prepended with SET SQL_LOG_BIN=0
984+ // - prepended with SET FK_CHECKS=0
985+ // etc.
986+ //
987+ // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
988+ // is solved by silently converting unsigned bigints to string values.
989+ //
990+
991+ err := func () error {
992+ tx , err := this .db .Begin ()
993+ if err != nil {
994+ return err
995+ }
996+ sessionQuery := `SET
952997 SESSION time_zone = '+00:00',
953998 sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
954999 `
955- if _ , err := tx .Exec (sessionQuery ); err != nil {
956- return err
957- }
958- if _ , err := tx .Exec (query , args ... ); err != nil {
959- return err
1000+ if _ , err := tx .Exec (sessionQuery ); err != nil {
1001+ return err
1002+ }
1003+ if _ , err := tx .Exec (buildResult .query , buildResult .args ... ); err != nil {
1004+ return err
1005+ }
1006+ if err := tx .Commit (); err != nil {
1007+ return err
1008+ }
1009+ return nil
1010+ }()
1011+
1012+ if err != nil {
1013+ err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), buildResult .query , buildResult .args )
1014+ return log .Errore (err )
9601015 }
961- if err := tx .Commit (); err != nil {
962- return err
1016+ // no error
1017+ atomic .AddInt64 (& this .migrationContext .TotalDMLEventsApplied , 1 )
1018+ if this .migrationContext .CountTableRows {
1019+ atomic .AddInt64 (& this .migrationContext .RowsDeltaEstimate , buildResult .rowsDelta )
9631020 }
964- return nil
965- }()
966-
967- if err != nil {
968- err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), query , args )
969- return log .Errore (err )
970- }
971- // no error
972- atomic .AddInt64 (& this .migrationContext .TotalDMLEventsApplied , 1 )
973- if this .migrationContext .CountTableRows {
974- atomic .AddInt64 (& this .migrationContext .RowsDeltaEstimate , rowDelta )
9751021 }
9761022 return nil
9771023}
@@ -1000,15 +1046,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
10001046 return rollback (err )
10011047 }
10021048 for _ , dmlEvent := range dmlEvents {
1003- query , args , rowDelta , err := this .buildDMLEventQuery (dmlEvent )
1004- if err != nil {
1005- return rollback (err )
1006- }
1007- if _ , err := tx .Exec (query , args ... ); err != nil {
1008- err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), query , args )
1009- return rollback (err )
1049+ for _ , buildResult := range this .buildDMLEventQuery (dmlEvent ) {
1050+ if buildResult .err != nil {
1051+ return rollback (buildResult .err )
1052+ }
1053+ if _ , err := tx .Exec (buildResult .query , buildResult .args ... ); err != nil {
1054+ err = fmt .Errorf ("%s; query=%s; args=%+v" , err .Error (), buildResult .query , buildResult .args )
1055+ return rollback (err )
1056+ }
1057+ totalDelta += buildResult .rowsDelta
10101058 }
1011- totalDelta += rowDelta
10121059 }
10131060 if err := tx .Commit (); err != nil {
10141061 return err
0 commit comments