Skip to content

Commit 6fa32f6

Browse files
author
Shlomi Noach
committed
cut-over failure on test-on-replica starts replication again
1 parent 5af7026 commit 6fa32f6

3 files changed

Lines changed: 69 additions & 9 deletions

File tree

go/logic/applier.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,11 +588,22 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
588588
// and have them written to the binary log, so that we can then read them via streamer.
589589
func (this *Applier) StopSlaveIOThread() error {
590590
query := `stop /* gh-ost */ slave io_thread`
591-
log.Infof("Stopping replication")
591+
log.Infof("Stopping replication IO thread")
592592
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
593593
return err
594594
}
595-
log.Infof("Replication stopped")
595+
log.Infof("Replication IO thread stopped")
596+
return nil
597+
}
598+
599+
// StartSlaveIOThread is applicable with --test-on-replica
600+
func (this *Applier) StartSlaveIOThread() error {
601+
query := `start /* gh-ost */ slave io_thread`
602+
log.Infof("Starting replication IO thread")
603+
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
604+
return err
605+
}
606+
log.Infof("Replication IO thread started")
596607
return nil
597608
}
598609

@@ -635,6 +646,18 @@ func (this *Applier) StopReplication() error {
635646
return nil
636647
}
637648

649+
// StartReplication is used by `--test-on-replica` on cut-over failure
650+
func (this *Applier) StartReplication() error {
651+
if err := this.StartSlaveIOThread(); err != nil {
652+
return err
653+
}
654+
if err := this.StartSlaveSQLThread(); err != nil {
655+
return err
656+
}
657+
log.Infof("Replication started")
658+
return nil
659+
}
660+
638661
// GetSessionLockName returns a name for the special hint session voluntary lock
639662
func (this *Applier) GetSessionLockName(sessionId int64) string {
640663
return fmt.Sprintf("gh-ost.%d.lock", sessionId)

go/logic/hooks.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
onFailure = "gh-ost-on-failure"
3131
onStatus = "gh-ost-on-status"
3232
onStopReplication = "gh-ost-on-stop-replication"
33+
onStartReplication = "gh-ost-on-start-replication"
3334
)
3435

3536
type HooksExecutor struct {
@@ -152,3 +153,7 @@ func (this *HooksExecutor) onStatus(statusMessage string) error {
152153
func (this *HooksExecutor) onStopReplication() error {
153154
return this.executeHooks(onStopReplication)
154155
}
156+
157+
func (this *HooksExecutor) onStartReplication() error {
158+
return this.executeHooks(onStartReplication)
159+
}

go/logic/migrator.go

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,38 @@ func (this *Migrator) ExecOnFailureHook() (err error) {
385385
return this.hooksExecutor.onFailure()
386386
}
387387

388+
func (this *Migrator) handleCutOverResult(cutOverError error) (err error) {
389+
if this.migrationContext.TestOnReplica {
390+
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
391+
this.applier.RenameTablesRollback()
392+
}
393+
if cutOverError == nil {
394+
return nil
395+
}
396+
// Only on error:
397+
398+
if this.migrationContext.TestOnReplica {
399+
// With `--test-on-replica` we stop replication thread, and then proceed to use
400+
// the same cut-over phase as the master would use. That means we take locks
401+
// and swap the tables.
402+
// The difference is that we will later swap the tables back.
403+
if err := this.hooksExecutor.onStartReplication(); err != nil {
404+
return log.Errore(err)
405+
}
406+
if this.migrationContext.TestOnReplicaSkipReplicaStop {
407+
log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not starting replication.")
408+
} else {
409+
log.Debugf("testing on replica. Starting replication IO thread after cut-over failure")
410+
if err := this.retryOperation(this.applier.StartReplication); err != nil {
411+
return log.Errore(err)
412+
}
413+
}
414+
}
415+
return nil
416+
}
417+
388418
// cutOver performs the final step of migration, based on migration
389-
// type (on replica? bumpy? safe?)
419+
// type (on replica? atomic? safe?)
390420
func (this *Migrator) cutOver() (err error) {
391421
if this.migrationContext.Noop {
392422
log.Debugf("Noop operation; not really swapping tables")
@@ -441,18 +471,18 @@ func (this *Migrator) cutOver() (err error) {
441471
return err
442472
}
443473
}
444-
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
445-
defer this.applier.RenameTablesRollback()
446-
// We further proceed to do the cutover by normal means; the 'defer' above will rollback the swap
447474
}
448475
if this.migrationContext.CutOverType == base.CutOverAtomic {
449476
// Atomic solution: we use low timeout and multiple attempts. But for
450477
// each failed attempt, we throttle until replication lag is back to normal
451478
err := this.atomicCutOver()
479+
this.handleCutOverResult(err)
452480
return err
453481
}
454482
if this.migrationContext.CutOverType == base.CutOverTwoStep {
455-
return this.cutOverTwoStep()
483+
err := this.cutOverTwoStep()
484+
this.handleCutOverResult(err)
485+
return err
456486
}
457487
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
458488
}
@@ -1043,8 +1073,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
10431073

10441074
availableEvents := len(this.applyEventsQueue)
10451075
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
1046-
if availableEvents > batchSize {
1047-
availableEvents = batchSize
1076+
if availableEvents > batchSize-1 {
1077+
// The "- 1" is because we already consumed one event: the original event that led to this function getting called.
1078+
// So, if DMLBatchSize==1 we wish to not process any further events
1079+
availableEvents = batchSize - 1
10481080
}
10491081
for i := 0; i < availableEvents; i++ {
10501082
additionalStruct := <-this.applyEventsQueue

0 commit comments

Comments
 (0)