Skip to content

Commit 79e4f1c

Browse files
author
Shlomi Noach
committed
detect range end based on OFFSET
1 parent 3abf748 commit 79e4f1c

3 files changed

Lines changed: 89 additions & 28 deletions

File tree

go/logic/applier.go

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -398,35 +398,41 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
398398
if this.migrationContext.MigrationIterationRangeMinValues == nil {
399399
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
400400
}
401-
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery(
402-
this.migrationContext.DatabaseName,
403-
this.migrationContext.OriginalTableName,
404-
&this.migrationContext.UniqueKey.Columns,
405-
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
406-
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
407-
atomic.LoadInt64(&this.migrationContext.ChunkSize),
408-
this.migrationContext.GetIteration() == 0,
409-
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
410-
)
411-
if err != nil {
412-
return hasFurtherRange, err
413-
}
414-
rows, err := this.db.Query(query, explodedArgs...)
415-
if err != nil {
416-
return hasFurtherRange, err
417-
}
418-
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
419-
for rows.Next() {
420-
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
401+
for i := 0; i < 2; i++ {
402+
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
403+
if i == 1 {
404+
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
405+
}
406+
query, explodedArgs, err := buildFunc(
407+
this.migrationContext.DatabaseName,
408+
this.migrationContext.OriginalTableName,
409+
&this.migrationContext.UniqueKey.Columns,
410+
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
411+
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
412+
atomic.LoadInt64(&this.migrationContext.ChunkSize),
413+
this.migrationContext.GetIteration() == 0,
414+
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
415+
)
416+
if err != nil {
421417
return hasFurtherRange, err
422418
}
423-
hasFurtherRange = true
424-
}
425-
if !hasFurtherRange {
426-
log.Debugf("Iteration complete: no further range to iterate")
427-
return hasFurtherRange, nil
419+
rows, err := this.db.Query(query, explodedArgs...)
420+
if err != nil {
421+
return hasFurtherRange, err
422+
}
423+
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
424+
for rows.Next() {
425+
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
426+
return hasFurtherRange, err
427+
}
428+
hasFurtherRange = true
429+
}
430+
if hasFurtherRange {
431+
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
432+
return hasFurtherRange, nil
433+
}
428434
}
429-
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
435+
log.Debugf("Iteration complete: no further range to iterate")
430436
return hasFurtherRange, nil
431437
}
432438

go/sql/builder.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,62 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
231231
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
232232
}
233233

234-
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
234+
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
235+
if uniqueKeyColumns.Len() == 0 {
236+
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
237+
}
238+
databaseName = EscapeName(databaseName)
239+
tableName = EscapeName(tableName)
240+
241+
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
242+
if includeRangeStartValues {
243+
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
244+
}
245+
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
246+
if err != nil {
247+
return "", explodedArgs, err
248+
}
249+
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
250+
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
251+
if err != nil {
252+
return "", explodedArgs, err
253+
}
254+
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
255+
256+
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
257+
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
258+
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
259+
for i, column := range uniqueKeyColumns.Columns() {
260+
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
261+
if column.Type == EnumColumnType {
262+
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
263+
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
264+
} else {
265+
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
266+
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
267+
}
268+
}
269+
result = fmt.Sprintf(`
270+
select /* gh-ost %s.%s %s */
271+
%s
272+
from
273+
%s.%s
274+
where %s and %s
275+
order by
276+
%s
277+
limit 1
278+
offset %d
279+
`, databaseName, tableName, hint,
280+
strings.Join(uniqueKeyColumnNames, ", "),
281+
databaseName, tableName,
282+
rangeStartComparison, rangeEndComparison,
283+
strings.Join(uniqueKeyColumnAscending, ", "),
284+
(chunkSize - 1),
285+
)
286+
return result, explodedArgs, nil
287+
}
288+
289+
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
235290
if uniqueKeyColumns.Len() == 0 {
236291
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
237292
}

go/sql/builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
283283
rangeStartArgs := []interface{}{3, 17}
284284
rangeEndArgs := []interface{}{103, 117}
285285

286-
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
286+
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
287287
test.S(t).ExpectNil(err)
288288
expected := `
289289
select /* gh-ost mydb.tbl test */ name, position

0 commit comments

Comments
 (0)