Skip to content

Commit 3144848

Browse files
authored
Optimize ApproximateQuantiles by removing unused context and adding registrations. (#38085)
Fix #38084
1 parent d1d5469 commit 3144848

1 file changed

Lines changed: 13 additions & 7 deletions

File tree

sdks/go/pkg/beam/transforms/stats/quantiles.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package stats
2020
import (
2121
"bytes"
2222
"container/heap"
23-
"context"
2423
"encoding/gob"
2524
"encoding/json"
2625
"hash/crc32"
@@ -48,8 +47,15 @@ func init() {
4847

4948
register.Function1x2(fixedKey)
5049
register.Function2x1(makeWeightedElement)
50+
register.Combiner3[*compactors, weightedElement, *compactors](&approximateQuantilesInputFn{})
51+
register.Combiner3[*compactors, *compactors, *compactors](&approximateQuantilesMergeOnlyFn{})
52+
register.Combiner3[*compactors, *compactors, []beam.T](&approximateQuantilesOutputFn{})
53+
register.DoFn1x2[beam.T, int, beam.T](&shardElementsFn{})
54+
register.Function0x1[error](setupResultShim)
5155
}
5256

57+
func setupResultShim() error { return nil }
58+
5359
// Opts contains settings used to configure how approximate quantiles are computed.
5460
type Opts struct {
5561
// Controls the memory used and approximation error (difference between the quantile returned and the true quantile.)
@@ -493,7 +499,7 @@ func (f *approximateQuantilesOutputFn) AddInput(compactors *compactors, element
493499
return compactors
494500
}
495501

496-
func (f *approximateQuantilesOutputFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
502+
func (f *approximateQuantilesOutputFn) MergeAccumulators(a, b *compactors) *compactors {
497503
a.merge(b, f.State.less)
498504
return a
499505
}
@@ -559,7 +565,7 @@ func toWeightedSlice(compactor compactor, less reflectx.Func2x1, weight int) []w
559565
}
560566
return weightedElements
561567
}
562-
func (f *approximateQuantilesOutputFn) ExtractOutput(ctx context.Context, compactors *compactors) []beam.T {
568+
func (f *approximateQuantilesOutputFn) ExtractOutput(compactors *compactors) []beam.T {
563569
sorted := toWeightedSlice(compactors.Compactors[0], f.State.less, 1)
564570
for level, compactor := range compactors.Compactors[1:] {
565571
sorted = mergeSortedWeighted(sorted, toWeightedSlice(compactor, f.State.less, 1<<uint(level)), func(a, b any) bool {
@@ -605,12 +611,12 @@ func (f *approximateQuantilesInputFn) AddInput(compactors *compactors, element w
605611
return compactors
606612
}
607613

608-
func (f *approximateQuantilesInputFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
614+
func (f *approximateQuantilesInputFn) MergeAccumulators(a, b *compactors) *compactors {
609615
a.merge(b, f.State.less)
610616
return a
611617
}
612618

613-
func (f *approximateQuantilesInputFn) ExtractOutput(ctx context.Context, compactors *compactors) *compactors {
619+
func (f *approximateQuantilesInputFn) ExtractOutput(compactors *compactors) *compactors {
614620
for i := range compactors.Compactors {
615621
// Sort the compactors here so when we're merging them for the final output, they're already sorted and we can merge elements in order.
616622
compactors.Compactors[i].sort(f.State.less)
@@ -634,12 +640,12 @@ func (f *approximateQuantilesMergeOnlyFn) AddInput(compactors *compactors, eleme
634640
return compactors
635641
}
636642

637-
func (f *approximateQuantilesMergeOnlyFn) MergeAccumulators(ctx context.Context, a, b *compactors) *compactors {
643+
func (f *approximateQuantilesMergeOnlyFn) MergeAccumulators(a, b *compactors) *compactors {
638644
a.merge(b, f.State.less)
639645
return a
640646
}
641647

642-
func (f *approximateQuantilesMergeOnlyFn) ExtractOutput(ctx context.Context, compactors *compactors) *compactors {
648+
func (f *approximateQuantilesMergeOnlyFn) ExtractOutput(compactors *compactors) *compactors {
643649
for i := range compactors.Compactors {
644650
// Sort the compactors here so when we're merging them for the final output, they're already sorted and we can merge elements in order.
645651
compactors.Compactors[i].sort(f.State.less)

0 commit comments

Comments
 (0)