@@ -83,11 +83,13 @@ type Migrator struct {
8383 applyEventsQueue chan * applyEventStruct
8484
8585 handledChangelogStates map [string ]bool
86+
87+ finishedMigrating int64
8688}
8789
88- func NewMigrator () * Migrator {
90+ func NewMigrator (context * base. MigrationContext ) * Migrator {
8991 migrator := & Migrator {
90- migrationContext : base . GetMigrationContext () ,
92+ migrationContext : context ,
9193 parser : sql .NewParser (),
9294 ghostTableMigrated : make (chan bool ),
9395 firstThrottlingCollected : make (chan bool , 3 ),
@@ -97,13 +99,14 @@ func NewMigrator() *Migrator {
9799 copyRowsQueue : make (chan tableWriteFunc ),
98100 applyEventsQueue : make (chan * applyEventStruct , base .MaxEventsBatchSize ),
99101 handledChangelogStates : make (map [string ]bool ),
102+ finishedMigrating : 0 ,
100103 }
101104 return migrator
102105}
103106
104107// initiateHooksExecutor
105108func (this * Migrator ) initiateHooksExecutor () (err error ) {
106- this .hooksExecutor = NewHooksExecutor ()
109+ this .hooksExecutor = NewHooksExecutor (this . migrationContext )
107110 if err := this .hooksExecutor .initHooks (); err != nil {
108111 return err
109112 }
@@ -299,6 +302,11 @@ func (this *Migrator) Migrate() (err error) {
299302 if err := this .validateStatement (); err != nil {
300303 return err
301304 }
305+
306+ // After this point, we'll need to teardown anything that's been started
307+ // so we don't leave things hanging around
308+ defer this .teardown ()
309+
302310 if err := this .initiateInspector (); err != nil {
303311 return err
304312 }
@@ -653,7 +661,7 @@ func (this *Migrator) initiateServer() (err error) {
653661 var f printStatusFunc = func (rule PrintStatusRule , writer io.Writer ) {
654662 this .printStatus (rule , writer )
655663 }
656- this .server = NewServer (this .hooksExecutor , f )
664+ this .server = NewServer (this .migrationContext , this . hooksExecutor , f )
657665 if err := this .server .BindSocketFile (); err != nil {
658666 return err
659667 }
@@ -673,7 +681,7 @@ func (this *Migrator) initiateServer() (err error) {
673681// - heartbeat
674682// When `--allow-on-master` is supplied, the inspector is actually the master.
675683func (this * Migrator ) initiateInspector () (err error ) {
676- this .inspector = NewInspector ()
684+ this .inspector = NewInspector (this . migrationContext )
677685 if err := this .inspector .InitDBConnections (); err != nil {
678686 return err
679687 }
@@ -733,6 +741,9 @@ func (this *Migrator) initiateStatus() error {
733741 this .printStatus (ForcePrintStatusAndHintRule )
734742 statusTick := time .Tick (1 * time .Second )
735743 for range statusTick {
744+ if atomic .LoadInt64 (& this .finishedMigrating ) > 0 {
745+ return nil
746+ }
736747 go this .printStatus (HeuristicPrintStatusRule )
737748 }
738749
@@ -932,7 +943,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
932943
933944// initiateStreaming begins streaming of binary log events and registers listeners for such events
934945func (this * Migrator ) initiateStreaming () error {
935- this .eventsStreamer = NewEventsStreamer ()
946+ this .eventsStreamer = NewEventsStreamer (this . migrationContext )
936947 if err := this .eventsStreamer .InitDBConnections (); err != nil {
937948 return err
938949 }
@@ -957,6 +968,9 @@ func (this *Migrator) initiateStreaming() error {
957968 go func () {
958969 ticker := time .Tick (1 * time .Second )
959970 for range ticker {
971+ if atomic .LoadInt64 (& this .finishedMigrating ) > 0 {
972+ return
973+ }
960974 this .migrationContext .SetRecentBinlogCoordinates (* this .eventsStreamer .GetCurrentBinlogCoordinates ())
961975 }
962976 }()
@@ -980,7 +994,7 @@ func (this *Migrator) addDMLEventsListener() error {
980994
981995// initiateThrottler kicks in the throttling collection and the throttling checks.
982996func (this * Migrator ) initiateThrottler () error {
983- this .throttler = NewThrottler (this .applier , this .inspector )
997+ this .throttler = NewThrottler (this .migrationContext , this . applier , this .inspector )
984998
985999 go this .throttler .initiateThrottlerCollection (this .firstThrottlingCollected )
9861000 log .Infof ("Waiting for first throttle metrics to be collected" )
@@ -994,7 +1008,7 @@ func (this *Migrator) initiateThrottler() error {
9941008}
9951009
9961010func (this * Migrator ) initiateApplier () error {
997- this .applier = NewApplier ()
1011+ this .applier = NewApplier (this . migrationContext )
9981012 if err := this .applier .InitDBConnections (); err != nil {
9991013 return err
10001014 }
@@ -1147,6 +1161,10 @@ func (this *Migrator) executeWriteFuncs() error {
11471161 return nil
11481162 }
11491163 for {
1164+ if atomic .LoadInt64 (& this .finishedMigrating ) > 0 {
1165+ return nil
1166+ }
1167+
11501168 this .throttler .throttle (nil )
11511169
11521170 // We give higher priority to event processing, then secondary priority to
@@ -1226,3 +1244,22 @@ func (this *Migrator) finalCleanup() error {
12261244
12271245 return nil
12281246}
1247+
1248+ func (this * Migrator ) teardown () {
1249+ atomic .StoreInt64 (& this .finishedMigrating , 1 )
1250+
1251+ if this .inspector != nil {
1252+ log .Infof ("Tearing down inspector" )
1253+ this .inspector .Teardown ()
1254+ }
1255+
1256+ if this .applier != nil {
1257+ log .Infof ("Tearing down applier" )
1258+ this .applier .Teardown ()
1259+ }
1260+
1261+ if this .eventsStreamer != nil {
1262+ log .Infof ("Tearing down streamer" )
1263+ this .eventsStreamer .Teardown ()
1264+ }
1265+ }
0 commit comments