Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit ad356e9

Browse files
Fixing switch perf
1 parent d36c511 commit ad356e9

26 files changed

Lines changed: 837 additions & 366 deletions

Gruntfile.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ module.exports = function (grunt) {
109109
'src/core/linq/observable/onerrorresumenextproto.js',
110110
'src/core/linq/observable/onerrorresumenext.js',
111111
'src/core/linq/observable/skipuntil.js',
112-
'src/core/linq/observable/switch.js',
112+
'src/core/perf/operators/switch.js',
113113
'src/core/perf/operators/takeuntil.js',
114114
'src/core/linq/observable/withlatestfrom.js',
115115
'src/core/linq/observable/zipproto.js',
@@ -403,7 +403,7 @@ module.exports = function (grunt) {
403403
'src/core/linq/observable/onerrorresumenextproto.js',
404404
'src/core/linq/observable/onerrorresumenext.js',
405405
'src/core/linq/observable/skipuntil.js',
406-
'src/core/linq/observable/switch.js',
406+
'src/core/perf/operators/switch.js',
407407
'src/core/perf/operators/takeuntil.js',
408408
'src/core/linq/observable/withlatestfrom.js',
409409
'src/core/linq/observable/zipproto.js',
@@ -697,7 +697,7 @@ module.exports = function (grunt) {
697697
'src/core/linq/observable/onerrorresumenextproto.js',
698698
'src/core/linq/observable/onerrorresumenext.js',
699699
'src/core/linq/observable/skipuntil.js',
700-
'src/core/linq/observable/switch.js',
700+
'src/core/perf/operators/switch.js',
701701
'src/core/perf/operators/takeuntil.js',
702702
'src/core/linq/observable/withlatestfrom.js',
703703
'src/core/linq/observable/zipproto.js',
@@ -846,7 +846,7 @@ module.exports = function (grunt) {
846846
'src/core/linq/observable/onerrorresumenextproto.js',
847847
'src/core/linq/observable/onerrorresumenext.js',
848848
'src/core/linq/observable/skipuntil.js',
849-
'src/core/linq/observable/switch.js',
849+
'src/core/perf/operators/switch.js',
850850
'src/core/perf/operators/takeuntil.js',
851851
'src/core/linq/observable/withlatestfrom.js',
852852
'src/core/linq/observable/zipproto.js',
@@ -977,7 +977,7 @@ module.exports = function (grunt) {
977977
'src/core/linq/observable/mergedelayerror.js',
978978
'src/core/perf/operators/mergeall.js',
979979
'src/core/linq/observable/skipuntil.js',
980-
'src/core/linq/observable/switch.js',
980+
'src/core/perf/operators/switch.js',
981981
'src/core/perf/operators/takeuntil.js',
982982
'src/core/linq/observable/withlatestfrom.js',
983983
'src/core/linq/observable/zipproto.js',
@@ -1147,7 +1147,7 @@ module.exports = function (grunt) {
11471147
'src/core/perf/operators/mergeall.js',
11481148
'src/core/linq/observable/mergedelayerror.js',
11491149
'src/core/linq/observable/skipuntil.js',
1150-
'src/core/linq/observable/switch.js',
1150+
'src/core/perf/operators/switch.js',
11511151
'src/core/perf/operators/takeuntil.js',
11521152
'src/core/linq/observable/withlatestfrom.js',
11531153
'src/core/linq/observable/zipproto.js',

dist/rx.all.compat.js

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3874,43 +3874,100 @@
38743874
}, source);
38753875
};
38763876

3877+
var SwitchObservable = (function(__super__) {
3878+
inherits(SwitchObservable, __super__);
3879+
function SwitchObservable(source) {
3880+
this.source = source;
3881+
__super__.call(this);
3882+
}
3883+
3884+
SwitchObservable.prototype.subscribeCore = function (o) {
3885+
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner));
3886+
return new CompositeDisposable(s, inner);
3887+
};
3888+
3889+
function SwitchObserver(o, inner) {
3890+
this.o = o;
3891+
this.inner = inner;
3892+
this.stopped = false;
3893+
this.latest = 0;
3894+
this.hasLatest = false;
3895+
this.isStopped = false;
3896+
}
3897+
SwitchObserver.prototype.onNext = function (innerSource) {
3898+
if (this.isStopped) { return; }
3899+
var d = new SingleAssignmentDisposable(), id = ++this.latest;
3900+
this.hasLatest = true;
3901+
this.inner.setDisposable(d);
3902+
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3903+
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id)));
3904+
};
3905+
SwitchObserver.prototype.onError = function (e) {
3906+
if (!this.isStopped) {
3907+
this.isStopped = true;
3908+
this.o.onError(e);
3909+
}
3910+
};
3911+
SwitchObserver.prototype.onCompleted = function () {
3912+
if (!this.isStopped) {
3913+
this.isStopped = true;
3914+
this.stopped = true;
3915+
!this.hasLatest && this.o.onCompleted();
3916+
}
3917+
};
3918+
SwitchObserver.prototype.dispose = function () { this.isStopped = true; };
3919+
SwitchObserver.prototype.fail = function (e) {
3920+
if(!this.isStopped) {
3921+
this.isStopped = true;
3922+
this.o.onError(e);
3923+
return true;
3924+
}
3925+
return false;
3926+
};
3927+
3928+
function InnerObserver(parent, id) {
3929+
this.parent = parent;
3930+
this.id = id;
3931+
this.isStopped = false;
3932+
}
3933+
InnerObserver.prototype.onNext = function (x) {
3934+
if (this.isStopped) { return; }
3935+
this.parent.latest === this.id && this.parent.o.onNext(x);
3936+
};
3937+
InnerObserver.prototype.onError = function (e) {
3938+
if (!this.isStopped) {
3939+
this.isStopped = true;
3940+
this.parent.latest === this.id && this.parent.o.onError(e);
3941+
}
3942+
};
3943+
InnerObserver.prototype.onCompleted = function () {
3944+
if (!this.isStopped) {
3945+
this.isStopped = true;
3946+
if (this.parent.latest === this.id) {
3947+
this.parent.hasLatest = false;
3948+
this.parent.isStopped && this.parent.o.onCompleted();
3949+
}
3950+
}
3951+
};
3952+
InnerObserver.prototype.dispose = function () { this.isStopped = true; }
3953+
InnerObserver.prototype.fail = function (e) {
3954+
if(!this.isStopped) {
3955+
this.isStopped = true;
3956+
this.parent.o.onError(e);
3957+
return true;
3958+
}
3959+
return false;
3960+
};
3961+
3962+
return SwitchObservable;
3963+
}(ObservableBase));
3964+
38773965
/**
3878-
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
3879-
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
3880-
*/
3966+
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
3967+
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
3968+
*/
38813969
observableProto['switch'] = observableProto.switchLatest = function () {
3882-
var sources = this;
3883-
return new AnonymousObservable(function (observer) {
3884-
var hasLatest = false,
3885-
innerSubscription = new SerialDisposable(),
3886-
isStopped = false,
3887-
latest = 0,
3888-
subscription = sources.subscribe(
3889-
function (innerSource) {
3890-
var d = new SingleAssignmentDisposable(), id = ++latest;
3891-
hasLatest = true;
3892-
innerSubscription.setDisposable(d);
3893-
3894-
// Check if Promise or Observable
3895-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3896-
3897-
d.setDisposable(innerSource.subscribe(
3898-
function (x) { latest === id && observer.onNext(x); },
3899-
function (e) { latest === id && observer.onError(e); },
3900-
function () {
3901-
if (latest === id) {
3902-
hasLatest = false;
3903-
isStopped && observer.onCompleted();
3904-
}
3905-
}));
3906-
},
3907-
function (e) { observer.onError(e); },
3908-
function () {
3909-
isStopped = true;
3910-
!hasLatest && observer.onCompleted();
3911-
});
3912-
return new CompositeDisposable(subscription, innerSubscription);
3913-
}, sources);
3970+
return new SwitchObservable(this);
39143971
};
39153972

39163973
var TakeUntilObservable = (function(__super__) {

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3729,43 +3729,100 @@
37293729
}, source);
37303730
};
37313731

3732+
var SwitchObservable = (function(__super__) {
3733+
inherits(SwitchObservable, __super__);
3734+
function SwitchObservable(source) {
3735+
this.source = source;
3736+
__super__.call(this);
3737+
}
3738+
3739+
SwitchObservable.prototype.subscribeCore = function (o) {
3740+
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner));
3741+
return new CompositeDisposable(s, inner);
3742+
};
3743+
3744+
function SwitchObserver(o, inner) {
3745+
this.o = o;
3746+
this.inner = inner;
3747+
this.stopped = false;
3748+
this.latest = 0;
3749+
this.hasLatest = false;
3750+
this.isStopped = false;
3751+
}
3752+
SwitchObserver.prototype.onNext = function (innerSource) {
3753+
if (this.isStopped) { return; }
3754+
var d = new SingleAssignmentDisposable(), id = ++this.latest;
3755+
this.hasLatest = true;
3756+
this.inner.setDisposable(d);
3757+
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3758+
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id)));
3759+
};
3760+
SwitchObserver.prototype.onError = function (e) {
3761+
if (!this.isStopped) {
3762+
this.isStopped = true;
3763+
this.o.onError(e);
3764+
}
3765+
};
3766+
SwitchObserver.prototype.onCompleted = function () {
3767+
if (!this.isStopped) {
3768+
this.isStopped = true;
3769+
this.stopped = true;
3770+
!this.hasLatest && this.o.onCompleted();
3771+
}
3772+
};
3773+
SwitchObserver.prototype.dispose = function () { this.isStopped = true; };
3774+
SwitchObserver.prototype.fail = function (e) {
3775+
if(!this.isStopped) {
3776+
this.isStopped = true;
3777+
this.o.onError(e);
3778+
return true;
3779+
}
3780+
return false;
3781+
};
3782+
3783+
function InnerObserver(parent, id) {
3784+
this.parent = parent;
3785+
this.id = id;
3786+
this.isStopped = false;
3787+
}
3788+
InnerObserver.prototype.onNext = function (x) {
3789+
if (this.isStopped) { return; }
3790+
this.parent.latest === this.id && this.parent.o.onNext(x);
3791+
};
3792+
InnerObserver.prototype.onError = function (e) {
3793+
if (!this.isStopped) {
3794+
this.isStopped = true;
3795+
this.parent.latest === this.id && this.parent.o.onError(e);
3796+
}
3797+
};
3798+
InnerObserver.prototype.onCompleted = function () {
3799+
if (!this.isStopped) {
3800+
this.isStopped = true;
3801+
if (this.parent.latest === this.id) {
3802+
this.parent.hasLatest = false;
3803+
this.parent.isStopped && this.parent.o.onCompleted();
3804+
}
3805+
}
3806+
};
3807+
InnerObserver.prototype.dispose = function () { this.isStopped = true; }
3808+
InnerObserver.prototype.fail = function (e) {
3809+
if(!this.isStopped) {
3810+
this.isStopped = true;
3811+
this.parent.o.onError(e);
3812+
return true;
3813+
}
3814+
return false;
3815+
};
3816+
3817+
return SwitchObservable;
3818+
}(ObservableBase));
3819+
37323820
/**
3733-
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
3734-
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
3735-
*/
3821+
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
3822+
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
3823+
*/
37363824
observableProto['switch'] = observableProto.switchLatest = function () {
3737-
var sources = this;
3738-
return new AnonymousObservable(function (observer) {
3739-
var hasLatest = false,
3740-
innerSubscription = new SerialDisposable(),
3741-
isStopped = false,
3742-
latest = 0,
3743-
subscription = sources.subscribe(
3744-
function (innerSource) {
3745-
var d = new SingleAssignmentDisposable(), id = ++latest;
3746-
hasLatest = true;
3747-
innerSubscription.setDisposable(d);
3748-
3749-
// Check if Promise or Observable
3750-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3751-
3752-
d.setDisposable(innerSource.subscribe(
3753-
function (x) { latest === id && observer.onNext(x); },
3754-
function (e) { latest === id && observer.onError(e); },
3755-
function () {
3756-
if (latest === id) {
3757-
hasLatest = false;
3758-
isStopped && observer.onCompleted();
3759-
}
3760-
}));
3761-
},
3762-
function (e) { observer.onError(e); },
3763-
function () {
3764-
isStopped = true;
3765-
!hasLatest && observer.onCompleted();
3766-
});
3767-
return new CompositeDisposable(subscription, innerSubscription);
3768-
}, sources);
3825+
return new SwitchObservable(this);
37693826
};
37703827

37713828
var TakeUntilObservable = (function(__super__) {

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)