Skip to content

Commit 7517238

Browse files
author
Shlomi Noach
authored
Merge branch 'master' into fix-infinite-cutover-loop
2 parents 289ce46 + e0cf2ad commit 7517238

168 files changed

Lines changed: 25985 additions & 389 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

doc/command-line-flags.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,7 @@ See `approve-renamed-columns`
130130
### test-on-replica
131131

132132
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md)
133+
134+
### timestamp-old-table
135+
136+
Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`.

go/base/context.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type MigrationContext struct {
121121
OkToDropTable bool
122122
InitiallyDropOldTable bool
123123
InitiallyDropGhostTable bool
124+
TimestampOldTable bool // Should old table name include a timestamp
124125
CutOverType CutOver
125126
ReplicaServerId uint
126127

@@ -234,11 +235,12 @@ func (this *MigrationContext) GetGhostTableName() string {
234235

235236
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
236237
func (this *MigrationContext) GetOldTableName() string {
237-
if this.TestOnReplica {
238-
return fmt.Sprintf("_%s_ght", this.OriginalTableName)
239-
}
240-
if this.MigrateOnReplica {
241-
return fmt.Sprintf("_%s_ghr", this.OriginalTableName)
238+
if this.TimestampOldTable {
239+
t := this.StartTime
240+
timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d",
241+
t.Year(), t.Month(), t.Day(),
242+
t.Hour(), t.Minute(), t.Second())
243+
return fmt.Sprintf("_%s_%s_del", this.OriginalTableName, timestamp)
242244
}
243245
return fmt.Sprintf("_%s_del", this.OriginalTableName)
244246
}

go/binlog/gomysql_reader.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/outbrain/golib/log"
1717
gomysql "github.com/siddontang/go-mysql/mysql"
1818
"github.com/siddontang/go-mysql/replication"
19+
"golang.org/x/net/context"
1920
)
2021

2122
type GoMySQLReader struct {
@@ -39,7 +40,16 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
3940
}
4041

4142
serverId := uint32(binlogReader.MigrationContext.ReplicaServerId)
42-
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
43+
44+
binlogSyncerConfig := &replication.BinlogSyncerConfig{
45+
ServerID: serverId,
46+
Flavor: "mysql",
47+
Host: connectionConfig.Key.Hostname,
48+
Port: uint16(connectionConfig.Key.Port),
49+
User: connectionConfig.User,
50+
Password: connectionConfig.Password,
51+
}
52+
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)
4353

4454
return binlogReader, err
4555
}
@@ -49,10 +59,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
4959
if coordinates.IsEmpty() {
5060
return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()")
5161
}
52-
log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port))
53-
if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil {
54-
return err
55-
}
5662

5763
this.currentCoordinates = coordinates
5864
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
@@ -126,7 +132,7 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
126132
if canStopStreaming() {
127133
break
128134
}
129-
ev, err := this.binlogStreamer.GetEvent()
135+
ev, err := this.binlogStreamer.GetEvent(context.Background())
130136
if err != nil {
131137
return err
132138
}

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func main() {
7777
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
7878
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
7979
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
80+
flag.BoolVar(&migrationContext.TimestampOldTable, "timestamp-old-table", false, "Use a timestamp in old table name. This makes old table names unique and non conflicting cross migrations")
8081
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (default|atomic, two-step)")
8182
flag.BoolVar(&migrationContext.ForceNamedCutOverCommand, "force-named-cut-over", false, "When true, the 'unpostpone|cut-over' interactive command must name the migrated table")
8283

go/logic/applier.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ func (this *Applier) ValidateOrDropExistingTables() error {
142142
return err
143143
}
144144
}
145+
if len(this.migrationContext.GetOldTableName()) > mysql.MaxTableNameLength {
146+
log.Fatalf("--timestamp-old-table defined, but resulting table name (%s) is too long (only %d characters allowed)", this.migrationContext.GetOldTableName(), mysql.MaxTableNameLength)
147+
}
148+
145149
if this.tableExists(this.migrationContext.GetOldTableName()) {
146150
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-old-table to force dropping it, though I really prefer that you drop it or rename it away", sql.EscapeName(this.migrationContext.GetOldTableName()))
147151
}

go/logic/inspect.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (this *Inspector) validateLogSlaveUpdates() error {
348348
}
349349

350350
if this.migrationContext.IsTungsten {
351-
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
351+
log.Warningf("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
352352
return nil
353353
}
354354

@@ -357,7 +357,7 @@ func (this *Inspector) validateLogSlaveUpdates() error {
357357
}
358358

359359
if this.migrationContext.InspectorIsAlsoApplier() {
360-
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
360+
log.Warningf("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
361361
return nil
362362
}
363363

go/logic/migrator.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1030,11 +1030,13 @@ func (this *Migrator) iterateChunks() error {
10301030
for {
10311031
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
10321032
// Done
1033+
// There's another such check down the line
10331034
return nil
10341035
}
10351036
copyRowsFunc := func() error {
10361037
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
1037-
// Done
1038+
// Done.
1039+
// There's another such check down the line
10381040
return nil
10391041
}
10401042
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
@@ -1046,6 +1048,17 @@ func (this *Migrator) iterateChunks() error {
10461048
}
10471049
// Copy task:
10481050
applyCopyRowsFunc := func() error {
1051+
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
1052+
// No need for more writes.
1053+
// This is the de-facto place where we avoid writing in the event of completed cut-over.
1054+
// There could _still_ be a race condition, but that's as close as we can get.
1055+
// What about the race condition? Well, there's actually no data integrity issue.
1056+
// when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied.
1057+
// But some are still then collected at the binary log, and these are the ones we're trying to
1058+
// not apply here. If the race condition wins over us, then we just attempt to apply onto the
1059+
// _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage.
1060+
return nil
1061+
}
10491062
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
10501063
if err != nil {
10511064
return terminateRowIteration(err)

go/mysql/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/outbrain/golib/sqlutils"
1717
)
1818

19+
const MaxTableNameLength = 64
20+
1921
type ReplicationLagResult struct {
2022
Key InstanceKey
2123
Lag time.Duration

vendor/github.com/ngaut/deadline/rw.go

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/ngaut/log/LICENSE

Lines changed: 165 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)