Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit b28eea9

Browse files
Adding mergeall optimization
1 parent 11c56e0 commit b28eea9

21 files changed

Lines changed: 625 additions & 214 deletions

Gruntfile.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ var browsers = [{
123123
'src/core/linq/observable/concatall.js',
124124
'src/core/linq/observable/mergeproto.js',
125125
'src/core/linq/observable/merge.js',
126-
'src/core/linq/observable/mergeall.js',
126+
'src/core/perf/operators/mergeall.js',
127127
'src/core/linq/observable/onerrorresumenextproto.js',
128128
'src/core/linq/observable/onerrorresumenext.js',
129129
'src/core/linq/observable/skipuntil.js',
@@ -411,7 +411,7 @@ var browsers = [{
411411
'src/core/linq/observable/concatall.js',
412412
'src/core/linq/observable/mergeproto.js',
413413
'src/core/linq/observable/merge.js',
414-
'src/core/linq/observable/mergeall.js',
414+
'src/core/perf/operators/mergeall.js',
415415
'src/core/linq/observable/onerrorresumenextproto.js',
416416
'src/core/linq/observable/onerrorresumenext.js',
417417
'src/core/linq/observable/skipuntil.js',
@@ -699,7 +699,7 @@ var browsers = [{
699699
'src/core/linq/observable/concatall.js',
700700
'src/core/linq/observable/mergeproto.js',
701701
'src/core/linq/observable/merge.js',
702-
'src/core/linq/observable/mergeall.js',
702+
'src/core/perf/operators/mergeall.js',
703703
'src/core/linq/observable/onerrorresumenextproto.js',
704704
'src/core/linq/observable/onerrorresumenext.js',
705705
'src/core/linq/observable/skipuntil.js',
@@ -843,7 +843,7 @@ var browsers = [{
843843
'src/core/linq/observable/concatall.js',
844844
'src/core/linq/observable/mergeproto.js',
845845
'src/core/linq/observable/merge.js',
846-
'src/core/linq/observable/mergeall.js',
846+
'src/core/perf/operators/mergeall.js',
847847
'src/core/linq/observable/onerrorresumenextproto.js',
848848
'src/core/linq/observable/onerrorresumenext.js',
849849
'src/core/linq/observable/skipuntil.js',
@@ -973,7 +973,7 @@ var browsers = [{
973973
'src/core/linq/observable/concatall.js',
974974
'src/core/linq/observable/mergeproto.js',
975975
'src/core/linq/observable/merge.js',
976-
'src/core/linq/observable/mergeall.js',
976+
'src/core/perf/operators/mergeall.js',
977977
'src/core/linq/observable/skipuntil.js',
978978
'src/core/linq/observable/switch.js',
979979
'src/core/linq/observable/takeuntil.js',
@@ -1138,7 +1138,7 @@ var browsers = [{
11381138
'src/core/linq/observable/concatall.js',
11391139
'src/core/linq/observable/mergeproto.js',
11401140
'src/core/linq/observable/merge.js',
1141-
'src/core/linq/observable/mergeall.js',
1141+
'src/core/perf/operators/mergeall.js',
11421142
'src/core/linq/observable/skipuntil.js',
11431143
'src/core/linq/observable/switch.js',
11441144
'src/core/linq/observable/takeuntil.js',

dist/rx.all.compat.js

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3308,43 +3308,82 @@
33083308
return observableOf(scheduler, sources).mergeAll();
33093309
};
33103310

3311-
/**
3312-
* Merges an observable sequence of observable sequences into an observable sequence.
3313-
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3314-
*/
3315-
observableProto.mergeAll = function () {
3316-
var sources = this;
3317-
return new AnonymousObservable(function (o) {
3318-
var group = new CompositeDisposable(),
3319-
isStopped = false,
3311+
var MergeAllObservable = (function (__super__) {
3312+
inherits(MergeAllObservable, __super__);
3313+
3314+
function MergeAllObservable(source) {
3315+
this.source = source;
3316+
__super__.call(this);
3317+
}
3318+
3319+
MergeAllObservable.prototype.subscribeCore = function (observer) {
3320+
var g = new CompositeDisposable(),
33203321
m = new SingleAssignmentDisposable();
3322+
g.add(m);
3323+
m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g)));
3324+
return g;
3325+
};
33213326

3322-
group.add(m);
3323-
m.setDisposable(sources.subscribe(function (innerSource) {
3324-
var innerSubscription = new SingleAssignmentDisposable();
3325-
group.add(innerSubscription);
3327+
return MergeAllObservable;
3328+
}(ObservableBase));
33263329

3327-
// Check for promises support
3328-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3330+
var MergeAllObserver = (function (__super__) {
33293331

3330-
innerSubscription.setDisposable(innerSource.subscribe(function (x) { o.onNext(x); }, function (e) { o.onError(e); }, function () {
3331-
group.remove(innerSubscription);
3332-
isStopped && group.length === 1 && o.onCompleted();
3333-
}));
3334-
}, function (e) { o.onError(e); }, function () {
3335-
isStopped = true;
3336-
group.length === 1 && o.onCompleted();
3337-
}));
3338-
return group;
3339-
}, sources);
3340-
};
3332+
inherits(MergeAllObserver, __super__);
3333+
3334+
function MergeAllObserver(observer, group) {
3335+
this.observer = observer;
3336+
this.group = group;
3337+
this.stopped = false;
3338+
__super__.call(this);
3339+
}
3340+
3341+
MergeAllObserver.prototype.next = function (innerSource) {
3342+
var innerSubscription = new SingleAssignmentDisposable();
3343+
this.group.add(innerSubscription);
3344+
3345+
// Check for promises support
3346+
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3347+
3348+
innerSubscription.setDisposable(innerSource.subscribe(new InnerObserver(this, innerSubscription)));
3349+
};
3350+
3351+
MergeAllObserver.prototype.error = function (e) {
3352+
this.observer.onError(e);
3353+
};
3354+
3355+
MergeAllObserver.prototype.completed = function () {
3356+
this.stopped = true;
3357+
this.group.length === 1 && this.observer.onCompleted();
3358+
};
3359+
3360+
var InnerObserver = (function (__base__) {
3361+
inherits(InnerObserver, __base__)
3362+
function InnerObserver(parent, innerSubscription) {
3363+
this.parent = parent;
3364+
this.innerSubscription = innerSubscription;
3365+
__base__.call(this);
3366+
}
3367+
InnerObserver.prototype.next = function (x) { this.parent.observer.onNext(x); };
3368+
InnerObserver.prototype.error = function (e) { this.parent.observer.onError(e); };
3369+
InnerObserver.prototype.completed = function () {
3370+
this.parent.group.remove(this.innerSubscription);
3371+
this.parent.stopped && this.parent.group.length === 1 && this.parent.observer.onCompleted();
3372+
};
3373+
3374+
return InnerObserver;
3375+
}(AbstractObserver));
3376+
3377+
return MergeAllObserver;
3378+
3379+
}(AbstractObserver));
33413380

33423381
/**
3343-
* @deprecated use #mergeAll instead.
3344-
*/
3345-
observableProto.mergeObservable = function () {
3346-
//deprecate('mergeObservable', 'mergeAll');
3347-
return this.mergeAll.apply(this, arguments);
3382+
* Merges an observable sequence of observable sequences into an observable sequence.
3383+
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3384+
*/
3385+
observableProto.mergeAll = observableProto.mergeObservable = function () {
3386+
return new MergeAllObservable(this);
33483387
};
33493388

33503389
/**

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3119,43 +3119,82 @@
31193119
return observableOf(scheduler, sources).mergeAll();
31203120
};
31213121

3122-
/**
3123-
* Merges an observable sequence of observable sequences into an observable sequence.
3124-
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3125-
*/
3126-
observableProto.mergeAll = function () {
3127-
var sources = this;
3128-
return new AnonymousObservable(function (o) {
3129-
var group = new CompositeDisposable(),
3130-
isStopped = false,
3122+
var MergeAllObservable = (function (__super__) {
3123+
inherits(MergeAllObservable, __super__);
3124+
3125+
function MergeAllObservable(source) {
3126+
this.source = source;
3127+
__super__.call(this);
3128+
}
3129+
3130+
MergeAllObservable.prototype.subscribeCore = function (observer) {
3131+
var g = new CompositeDisposable(),
31313132
m = new SingleAssignmentDisposable();
3133+
g.add(m);
3134+
m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g)));
3135+
return g;
3136+
};
31323137

3133-
group.add(m);
3134-
m.setDisposable(sources.subscribe(function (innerSource) {
3135-
var innerSubscription = new SingleAssignmentDisposable();
3136-
group.add(innerSubscription);
3138+
return MergeAllObservable;
3139+
}(ObservableBase));
31373140

3138-
// Check for promises support
3139-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3141+
var MergeAllObserver = (function (__super__) {
31403142

3141-
innerSubscription.setDisposable(innerSource.subscribe(function (x) { o.onNext(x); }, function (e) { o.onError(e); }, function () {
3142-
group.remove(innerSubscription);
3143-
isStopped && group.length === 1 && o.onCompleted();
3144-
}));
3145-
}, function (e) { o.onError(e); }, function () {
3146-
isStopped = true;
3147-
group.length === 1 && o.onCompleted();
3148-
}));
3149-
return group;
3150-
}, sources);
3151-
};
3143+
inherits(MergeAllObserver, __super__);
3144+
3145+
function MergeAllObserver(observer, group) {
3146+
this.observer = observer;
3147+
this.group = group;
3148+
this.stopped = false;
3149+
__super__.call(this);
3150+
}
3151+
3152+
MergeAllObserver.prototype.next = function (innerSource) {
3153+
var innerSubscription = new SingleAssignmentDisposable();
3154+
this.group.add(innerSubscription);
3155+
3156+
// Check for promises support
3157+
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3158+
3159+
innerSubscription.setDisposable(innerSource.subscribe(new InnerObserver(this, innerSubscription)));
3160+
};
3161+
3162+
MergeAllObserver.prototype.error = function (e) {
3163+
this.observer.onError(e);
3164+
};
3165+
3166+
MergeAllObserver.prototype.completed = function () {
3167+
this.stopped = true;
3168+
this.group.length === 1 && this.observer.onCompleted();
3169+
};
3170+
3171+
var InnerObserver = (function (__base__) {
3172+
inherits(InnerObserver, __base__)
3173+
function InnerObserver(parent, innerSubscription) {
3174+
this.parent = parent;
3175+
this.innerSubscription = innerSubscription;
3176+
__base__.call(this);
3177+
}
3178+
InnerObserver.prototype.next = function (x) { this.parent.observer.onNext(x); };
3179+
InnerObserver.prototype.error = function (e) { this.parent.observer.onError(e); };
3180+
InnerObserver.prototype.completed = function () {
3181+
this.parent.group.remove(this.innerSubscription);
3182+
this.parent.stopped && this.parent.group.length === 1 && this.parent.observer.onCompleted();
3183+
};
3184+
3185+
return InnerObserver;
3186+
}(AbstractObserver));
3187+
3188+
return MergeAllObserver;
3189+
3190+
}(AbstractObserver));
31523191

31533192
/**
3154-
* @deprecated use #mergeAll instead.
3155-
*/
3156-
observableProto.mergeObservable = function () {
3157-
//deprecate('mergeObservable', 'mergeAll');
3158-
return this.mergeAll.apply(this, arguments);
3193+
* Merges an observable sequence of observable sequences into an observable sequence.
3194+
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3195+
*/
3196+
observableProto.mergeAll = observableProto.mergeObservable = function () {
3197+
return new MergeAllObservable(this);
31593198
};
31603199

31613200
/**

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.js

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3308,43 +3308,82 @@
33083308
return observableOf(scheduler, sources).mergeAll();
33093309
};
33103310

3311-
/**
3312-
* Merges an observable sequence of observable sequences into an observable sequence.
3313-
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3314-
*/
3315-
observableProto.mergeAll = function () {
3316-
var sources = this;
3317-
return new AnonymousObservable(function (o) {
3318-
var group = new CompositeDisposable(),
3319-
isStopped = false,
3311+
var MergeAllObservable = (function (__super__) {
3312+
inherits(MergeAllObservable, __super__);
3313+
3314+
function MergeAllObservable(source) {
3315+
this.source = source;
3316+
__super__.call(this);
3317+
}
3318+
3319+
MergeAllObservable.prototype.subscribeCore = function (observer) {
3320+
var g = new CompositeDisposable(),
33203321
m = new SingleAssignmentDisposable();
3322+
g.add(m);
3323+
m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g)));
3324+
return g;
3325+
};
33213326

3322-
group.add(m);
3323-
m.setDisposable(sources.subscribe(function (innerSource) {
3324-
var innerSubscription = new SingleAssignmentDisposable();
3325-
group.add(innerSubscription);
3327+
return MergeAllObservable;
3328+
}(ObservableBase));
33263329

3327-
// Check for promises support
3328-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3330+
var MergeAllObserver = (function (__super__) {
33293331

3330-
innerSubscription.setDisposable(innerSource.subscribe(function (x) { o.onNext(x); }, function (e) { o.onError(e); }, function () {
3331-
group.remove(innerSubscription);
3332-
isStopped && group.length === 1 && o.onCompleted();
3333-
}));
3334-
}, function (e) { o.onError(e); }, function () {
3335-
isStopped = true;
3336-
group.length === 1 && o.onCompleted();
3337-
}));
3338-
return group;
3339-
}, sources);
3340-
};
3332+
inherits(MergeAllObserver, __super__);
3333+
3334+
function MergeAllObserver(observer, group) {
3335+
this.observer = observer;
3336+
this.group = group;
3337+
this.stopped = false;
3338+
__super__.call(this);
3339+
}
3340+
3341+
MergeAllObserver.prototype.next = function (innerSource) {
3342+
var innerSubscription = new SingleAssignmentDisposable();
3343+
this.group.add(innerSubscription);
3344+
3345+
// Check for promises support
3346+
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3347+
3348+
innerSubscription.setDisposable(innerSource.subscribe(new InnerObserver(this, innerSubscription)));
3349+
};
3350+
3351+
MergeAllObserver.prototype.error = function (e) {
3352+
this.observer.onError(e);
3353+
};
3354+
3355+
MergeAllObserver.prototype.completed = function () {
3356+
this.stopped = true;
3357+
this.group.length === 1 && this.observer.onCompleted();
3358+
};
3359+
3360+
var InnerObserver = (function (__base__) {
3361+
inherits(InnerObserver, __base__)
3362+
function InnerObserver(parent, innerSubscription) {
3363+
this.parent = parent;
3364+
this.innerSubscription = innerSubscription;
3365+
__base__.call(this);
3366+
}
3367+
InnerObserver.prototype.next = function (x) { this.parent.observer.onNext(x); };
3368+
InnerObserver.prototype.error = function (e) { this.parent.observer.onError(e); };
3369+
InnerObserver.prototype.completed = function () {
3370+
this.parent.group.remove(this.innerSubscription);
3371+
this.parent.stopped && this.parent.group.length === 1 && this.parent.observer.onCompleted();
3372+
};
3373+
3374+
return InnerObserver;
3375+
}(AbstractObserver));
3376+
3377+
return MergeAllObserver;
3378+
3379+
}(AbstractObserver));
33413380

33423381
/**
3343-
* @deprecated use #mergeAll instead.
3344-
*/
3345-
observableProto.mergeObservable = function () {
3346-
//deprecate('mergeObservable', 'mergeAll');
3347-
return this.mergeAll.apply(this, arguments);
3382+
* Merges an observable sequence of observable sequences into an observable sequence.
3383+
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3384+
*/
3385+
observableProto.mergeAll = observableProto.mergeObservable = function () {
3386+
return new MergeAllObservable(this);
33483387
};
33493388

33503389
/**

dist/rx.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.min.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)