@@ -10,6 +10,7 @@ import (
1010 gosql "database/sql"
1111 "strings"
1212 "testing"
13+ "time"
1314
1415 "github.com/stretchr/testify/require"
1516 "github.com/stretchr/testify/suite"
@@ -201,6 +202,30 @@ type ApplierTestSuite struct {
201202 mysqlContainer testcontainers.Container
202203}
203204
205+ func (suite * ApplierTestSuite ) getConnectionConfig (ctx context.Context ) (* mysql.ConnectionConfig , error ) {
206+ host , err := suite .mysqlContainer .ContainerIP (ctx )
207+ if err != nil {
208+ return nil , err
209+ }
210+
211+ config := mysql .NewConnectionConfig ()
212+ config .Key .Hostname = host
213+ config .Key .Port = 3306
214+ config .User = "root"
215+ config .Password = "root-password"
216+
217+ return config , nil
218+ }
219+
220+ func (suite * ApplierTestSuite ) getDb (ctx context.Context ) (* gosql.DB , error ) {
221+ host , err := suite .mysqlContainer .ContainerIP (ctx )
222+ if err != nil {
223+ return nil , err
224+ }
225+
226+ return gosql .Open ("mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
227+ }
228+
204229func (suite * ApplierTestSuite ) SetupSuite () {
205230 ctx := context .Background ()
206231 req := testcontainers.ContainerRequest {
@@ -231,7 +256,7 @@ func (suite *ApplierTestSuite) SetupTest() {
231256 suite .Require ().NoError (err )
232257 suite .Require ().Equalf (0 , rc , "failed to created database: expected exit code 0, got %d" , rc )
233258
234- rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT);" })
259+ rc , _ , err = suite .mysqlContainer .Exec (ctx , []string {"mysql" , "-uroot" , "-proot-password" , "-e" , "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id) );" })
235260 suite .Require ().NoError (err )
236261 suite .Require ().Equalf (0 , rc , "failed to created table: expected exit code 0, got %d" , rc )
237262}
@@ -247,15 +272,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
247272func (suite * ApplierTestSuite ) TestInitDBConnections () {
248273 ctx := context .Background ()
249274
250- host , err := suite .mysqlContainer . ContainerIP (ctx )
275+ connectionConfig , err := suite .getConnectionConfig (ctx )
251276 suite .Require ().NoError (err )
252277
253278 migrationContext := base .NewMigrationContext ()
254- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
255- migrationContext .ApplierConnectionConfig .Key .Hostname = host
256- migrationContext .ApplierConnectionConfig .Key .Port = 3306
257- migrationContext .ApplierConnectionConfig .User = "root"
258- migrationContext .ApplierConnectionConfig .Password = "root-password"
279+ migrationContext .ApplierConnectionConfig = connectionConfig
259280 migrationContext .DatabaseName = "test"
260281 migrationContext .OriginalTableName = "testing"
261282 migrationContext .SetConnectionConfig ("innodb" )
@@ -276,15 +297,11 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
276297func (suite * ApplierTestSuite ) TestApplyDMLEventQueries () {
277298 ctx := context .Background ()
278299
279- host , err := suite .mysqlContainer . ContainerIP (ctx )
300+ connectionConfig , err := suite .getConnectionConfig (ctx )
280301 suite .Require ().NoError (err )
281302
282303 migrationContext := base .NewMigrationContext ()
283- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
284- migrationContext .ApplierConnectionConfig .Key .Hostname = host
285- migrationContext .ApplierConnectionConfig .Key .Port = 3306
286- migrationContext .ApplierConnectionConfig .User = "root"
287- migrationContext .ApplierConnectionConfig .Password = "root-password"
304+ migrationContext .ApplierConnectionConfig = connectionConfig
288305 migrationContext .DatabaseName = "test"
289306 migrationContext .OriginalTableName = "testing"
290307 migrationContext .SetConnectionConfig ("innodb" )
@@ -320,7 +337,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
320337 suite .Require ().NoError (err )
321338
322339 // Check that the row was inserted
323- db , err := gosql . Open ( "mysql" , "root:root-password@tcp(" + host + ":3306)/test" )
340+ db , err := suite . getDb ( ctx )
324341 suite .Require ().NoError (err )
325342 defer db .Close ()
326343
@@ -347,15 +364,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
347364func (suite * ApplierTestSuite ) TestValidateOrDropExistingTables () {
348365 ctx := context .Background ()
349366
350- host , err := suite .mysqlContainer . ContainerIP (ctx )
367+ connectionConfig , err := suite .getConnectionConfig (ctx )
351368 suite .Require ().NoError (err )
352369
353370 migrationContext := base .NewMigrationContext ()
354- migrationContext .ApplierConnectionConfig = mysql .NewConnectionConfig ()
355- migrationContext .ApplierConnectionConfig .Key .Hostname = host
356- migrationContext .ApplierConnectionConfig .Key .Port = 3306
357- migrationContext .ApplierConnectionConfig .User = "root"
358- migrationContext .ApplierConnectionConfig .Password = "root-password"
371+ migrationContext .ApplierConnectionConfig = connectionConfig
359372 migrationContext .DatabaseName = "test"
360373 migrationContext .OriginalTableName = "testing"
361374 migrationContext .SetConnectionConfig ("innodb" )
@@ -374,6 +387,140 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
374387 suite .Require ().NoError (err )
375388}
376389
390+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQuery () {
391+ ctx := context .Background ()
392+
393+ connectionConfig , err := suite .getConnectionConfig (ctx )
394+ suite .Require ().NoError (err )
395+
396+ migrationContext := base .NewMigrationContext ()
397+ migrationContext .ApplierConnectionConfig = connectionConfig
398+ migrationContext .DatabaseName = "test"
399+ migrationContext .OriginalTableName = "testing"
400+ migrationContext .ChunkSize = 10
401+ migrationContext .SetConnectionConfig ("innodb" )
402+
403+ db , err := suite .getDb (ctx )
404+ suite .Require ().NoError (err )
405+ defer db .Close ()
406+
407+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
408+ suite .Require ().NoError (err )
409+
410+ // Insert some test values
411+ for i := 1 ; i <= 10 ; i ++ {
412+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
413+ suite .Require ().NoError (err )
414+ }
415+
416+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
417+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
418+ migrationContext .UniqueKey = & sql.UniqueKey {
419+ Name : "PRIMARY" ,
420+ Columns : * sql .NewColumnList ([]string {"id" }),
421+ }
422+
423+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
424+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
425+
426+ applier := NewApplier (migrationContext )
427+ defer applier .Teardown ()
428+
429+ err = applier .InitDBConnections ()
430+ suite .Require ().NoError (err )
431+
432+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
433+ suite .Require ().NoError (err )
434+
435+ suite .Require ().Equal (migrationContext .ChunkSize , chunkSize )
436+ suite .Require ().Equal (int64 (10 ), rowsAffected )
437+ suite .Require ().Greater (duration , time .Duration (0 ))
438+
439+ // Check that the rows were inserted
440+ rows , err := db .Query ("SELECT * FROM test._testing_gho" )
441+ suite .Require ().NoError (err )
442+ defer rows .Close ()
443+
444+ var count , id , item_id int
445+ for rows .Next () {
446+ err = rows .Scan (& id , & item_id )
447+ suite .Require ().NoError (err )
448+ count += 1
449+ }
450+ suite .Require ().NoError (rows .Err ())
451+
452+ suite .Require ().Equal (10 , count )
453+ }
454+
455+ func (suite * ApplierTestSuite ) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows () {
456+ ctx := context .Background ()
457+
458+ connectionConfig , err := suite .getConnectionConfig (ctx )
459+ suite .Require ().NoError (err )
460+
461+ migrationContext := base .NewMigrationContext ()
462+ migrationContext .ApplierConnectionConfig = connectionConfig
463+ migrationContext .DatabaseName = "test"
464+ migrationContext .OriginalTableName = "testing"
465+ migrationContext .ChunkSize = 10
466+ migrationContext .TableEngine = "innodb"
467+ migrationContext .SetConnectionConfig ("innodb" )
468+
469+ db , err := suite .getDb (ctx )
470+ suite .Require ().NoError (err )
471+ defer db .Close ()
472+
473+ _ , err = db .Exec ("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))" )
474+ suite .Require ().NoError (err )
475+
476+ // Insert some test values
477+ for i := 1 ; i <= 10 ; i ++ {
478+ _ , err = db .Exec ("INSERT INTO test.testing (id, item_id) VALUES (?, ?)" , i , i )
479+ suite .Require ().NoError (err )
480+ }
481+
482+ migrationContext .SharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
483+ migrationContext .MappedSharedColumns = sql .NewColumnList ([]string {"id" , "item_id" })
484+ migrationContext .UniqueKey = & sql.UniqueKey {
485+ Name : "PRIMARY" ,
486+ Columns : * sql .NewColumnList ([]string {"id" }),
487+ }
488+
489+ migrationContext .MigrationIterationRangeMinValues = sql .ToColumnValues ([]interface {}{1 })
490+ migrationContext .MigrationIterationRangeMaxValues = sql .ToColumnValues ([]interface {}{10 })
491+
492+ applier := NewApplier (migrationContext )
493+ defer applier .Teardown ()
494+
495+ err = applier .InitDBConnections ()
496+ suite .Require ().NoError (err )
497+
498+ // Lock one of the rows
499+ tx , err := db .Begin ()
500+ suite .Require ().NoError (err )
501+ defer func () {
502+ suite .Require ().NoError (tx .Rollback ())
503+ }()
504+
505+ _ , err = tx .Exec ("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE" )
506+ suite .Require ().NoError (err )
507+
508+ chunkSize , rowsAffected , duration , err := applier .ApplyIterationInsertQuery ()
509+ suite .Require ().Error (err )
510+ suite .Require ().EqualError (err , "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set." )
511+
512+ suite .Require ().Equal (migrationContext .ChunkSize , chunkSize )
513+ suite .Require ().Equal (int64 (0 ), rowsAffected )
514+ suite .Require ().Equal (time .Duration (0 ), duration )
515+
516+ // Check that the no rows were inserted
517+ var count int
518+ err = db .QueryRow ("SELECT COUNT(*) FROM test._testing_gho" ).Scan (& count )
519+ suite .Require ().NoError (err )
520+
521+ suite .Require ().Equal (0 , count )
522+ }
523+
377524func TestApplier (t * testing.T ) {
378525 suite .Run (t , new (ApplierTestSuite ))
379526}
0 commit comments