Skip to content

Commit 9226769

Browse files
author
Shlomi Noach
authored
Merge pull request #528 from nikhilmat/nm-teardown-contrib
Implement Teardown (added back DB cache)
2 parents 7ef284a + e82d563 commit 9226769

7 files changed

Lines changed: 68 additions & 24 deletions

File tree

go/base/context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"sync/atomic"
1515
"time"
1616

17+
"github.com/satori/go.uuid"
18+
1719
"github.com/github/gh-ost/go/mysql"
1820
"github.com/github/gh-ost/go/sql"
1921

@@ -71,6 +73,8 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
7173
// MigrationContext has the general, global state of migration. It is used by
7274
// all components throughout the migration process.
7375
type MigrationContext struct {
76+
Uuid string
77+
7478
DatabaseName string
7579
OriginalTableName string
7680
AlterStatement string
@@ -212,6 +216,7 @@ type ContextConfig struct {
212216

213217
func NewMigrationContext() *MigrationContext {
214218
return &MigrationContext{
219+
Uuid: uuid.NewV4().String(),
215220
defaultNumRetries: 60,
216221
ChunkSize: 1000,
217222
InspectorConnectionConfig: mysql.NewConnectionConfig(),

go/binlog/gomysql_reader.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
158158
}
159159

160160
func (this *GoMySQLReader) Close() error {
161-
// Historically there was a:
162-
// this.binlogSyncer.Close()
163-
// here. A new go-mysql version closes the binlog syncer connection independently.
164-
// I will go against the sacred rules of comments and just leave this here.
165-
// This is the year 2017. Let's see what year these comments get deleted.
161+
this.binlogSyncer.Close()
166162
return nil
167163
}

go/logic/applier.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,13 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
6868
}
6969

7070
func (this *Applier) InitDBConnections() (err error) {
71+
7172
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
72-
if this.db, err = mysql.GetDB(applierUri); err != nil {
73+
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
7374
return err
7475
}
7576
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
76-
if this.singletonDB, err = mysql.GetDB(singletonApplierUri); err != nil {
77+
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
7778
return err
7879
}
7980
this.singletonDB.SetMaxOpenConns(1)

go/logic/inspect.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ func NewInspector(migrationContext *base.MigrationContext) *Inspector {
4141

4242
func (this *Inspector) InitDBConnections() (err error) {
4343
inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
44-
if this.db, err = mysql.GetDB(inspectorUri); err != nil {
44+
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, inspectorUri); err != nil {
4545
return err
4646
}
4747

4848
informationSchemaUri := this.connectionConfig.GetDBUri("information_schema")
49-
if this.informationSchemaDb, err = mysql.GetDB(informationSchemaUri); err != nil {
49+
if this.informationSchemaDb, _, err = mysql.GetDB(this.migrationContext.Uuid, informationSchemaUri); err != nil {
5050
return err
5151
}
5252

go/logic/streamer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
104104

105105
func (this *EventsStreamer) InitDBConnections() (err error) {
106106
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
107-
if this.db, err = mysql.GetDB(EventsStreamerUri); err != nil {
107+
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
108108
return err
109109
}
110110
if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil {

go/logic/throttler.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,18 @@ const frenoMagicHint = "freno"
4141
// Throttler collects metrics related to throttling and makes informed decision
4242
// whether throttling should take place.
4343
type Throttler struct {
44-
migrationContext *base.MigrationContext
45-
applier *Applier
46-
inspector *Inspector
44+
migrationContext *base.MigrationContext
45+
applier *Applier
46+
inspector *Inspector
47+
finishedMigrating int64
4748
}
4849

4950
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler {
5051
return &Throttler{
51-
migrationContext: migrationContext,
52-
applier: applier,
53-
inspector: inspector,
52+
migrationContext: migrationContext,
53+
applier: applier,
54+
inspector: inspector,
55+
finishedMigrating: 0,
5456
}
5557
}
5658

@@ -159,6 +161,9 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
159161

160162
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
161163
for range ticker {
164+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
165+
return
166+
}
162167
go collectFunc()
163168
}
164169
}
@@ -181,7 +186,7 @@ func (this *Throttler) collectControlReplicasLag() {
181186
dbUri := connectionConfig.GetDBUri("information_schema")
182187

183188
var heartbeatValue string
184-
if db, err := mysql.GetDB(dbUri); err != nil {
189+
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
185190
return lag, err
186191
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
187192
return lag, err
@@ -233,6 +238,9 @@ func (this *Throttler) collectControlReplicasLag() {
233238
shouldReadLagAggressively := false
234239

235240
for range aggressiveTicker {
241+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
242+
return
243+
}
236244
if counter%relaxedFactor == 0 {
237245
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
238246
// do not typically change at all throughout the migration, but nonetheless we check them.
@@ -285,6 +293,10 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
285293

286294
ticker := time.Tick(100 * time.Millisecond)
287295
for range ticker {
296+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
297+
return
298+
}
299+
288300
if sleep, _ := collectFunc(); sleep {
289301
time.Sleep(1 * time.Second)
290302
}
@@ -393,6 +405,10 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan
393405

394406
throttlerMetricsTick := time.Tick(1 * time.Second)
395407
for range throttlerMetricsTick {
408+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
409+
return
410+
}
411+
396412
this.collectGeneralThrottleMetrics()
397413
}
398414
}()
@@ -419,6 +435,9 @@ func (this *Throttler) initiateThrottlerChecks() error {
419435
}
420436
throttlerFunction()
421437
for range throttlerTick {
438+
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
439+
return nil
440+
}
422441
throttlerFunction()
423442
}
424443

@@ -440,3 +459,8 @@ func (this *Throttler) throttle(onThrottled func()) {
440459
time.Sleep(250 * time.Millisecond)
441460
}
442461
}
462+
463+
func (this *Throttler) Teardown() {
464+
log.Debugf("Tearing down...")
465+
atomic.StoreInt64(&this.finishedMigrating, 1)
466+
}

go/mysql/utils.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"sync"
1112
"time"
1213

1314
"github.com/github/gh-ost/go/sql"
@@ -33,13 +34,27 @@ func (this *ReplicationLagResult) HasLag() bool {
3334
return this.Lag > 0
3435
}
3536

36-
func GetDB(mysql_uri string) (*gosql.DB, error) {
37-
db, err := gosql.Open("mysql", mysql_uri)
38-
if err == nil {
39-
return db, nil
40-
} else {
41-
return nil, err
37+
// knownDBs is a DB cache by uri
38+
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
39+
var knownDBsMutex = &sync.Mutex{}
40+
41+
func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
42+
cacheKey := migrationUuid + ":" + mysql_uri
43+
44+
knownDBsMutex.Lock()
45+
defer func() {
46+
knownDBsMutex.Unlock()
47+
}()
48+
49+
var exists bool
50+
if _, exists = knownDBs[cacheKey]; !exists {
51+
if db, err := gosql.Open("mysql", mysql_uri); err == nil {
52+
knownDBs[cacheKey] = db
53+
} else {
54+
return db, exists, err
55+
}
4256
}
57+
return knownDBs[cacheKey], exists, nil
4358
}
4459

4560
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
@@ -62,7 +77,10 @@ func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *Connecti
6277
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
6378
currentUri := connectionConfig.GetDBUri("information_schema")
6479
// This function is only called once, okay to not have a cached connection pool
65-
db, err := GetDB(currentUri)
80+
db, err := gosql.Open("mysql", currentUri)
81+
if err != nil {
82+
return nil, err
83+
}
6684
defer db.Close()
6785

6886
if err != nil {

0 commit comments

Comments
 (0)