Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 7f32d00

Browse files
Fixing onErrorResumeNext for perf
1 parent 90857e7 commit 7f32d00

25 files changed

Lines changed: 469 additions & 217 deletions

dist/rx.all.compat.js

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4262,37 +4262,65 @@ var ObserveOnObservable = (function (__super__) {
42624262
return onErrorResumeNext([this, second]);
42634263
};
42644264

4265+
var OnErrorResumeNextObservable = (function(__super__) {
4266+
inherits(OnErrorResumeNextObservable, __super__);
4267+
function OnErrorResumeNextObservable(sources) {
4268+
this.sources = sources;
4269+
__super__.call(this);
4270+
}
4271+
4272+
function scheduleMethod(state, recurse) {
4273+
if (state.pos < state.sources.length) {
4274+
var current = state.sources[state.pos++];
4275+
isPromise(current) && (current = observableFromPromise(current));
4276+
var d = new SingleAssignmentDisposable();
4277+
state.subscription.setDisposable(d);
4278+
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
4279+
} else {
4280+
state.o.onCompleted();
4281+
}
4282+
}
4283+
4284+
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
4285+
var subscription = new SerialDisposable(),
4286+
state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
4287+
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
4288+
4289+
return new BinaryDisposable(subscription, cancellable);
4290+
};
4291+
4292+
return OnErrorResumeNextObservable;
4293+
}(ObservableBase));
4294+
4295+
var OnErrorResumeNextObserver = (function(__super__) {
4296+
inherits(OnErrorResumeNextObserver, __super__);
4297+
function OnErrorResumeNextObserver(state, recurse) {
4298+
this._state = state;
4299+
this._recurse = recurse;
4300+
__super__.call(this);
4301+
}
4302+
4303+
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
4304+
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
4305+
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
4306+
4307+
return OnErrorResumeNextObserver;
4308+
}(AbstractObserver));
4309+
42654310
/**
42664311
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
4267-
*
4268-
* @example
4269-
* 1 - res = Rx.Observable.onErrorResumeNext(xs, ys, zs);
4270-
* 1 - res = Rx.Observable.onErrorResumeNext([xs, ys, zs]);
42714312
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
42724313
*/
42734314
var onErrorResumeNext = Observable.onErrorResumeNext = function () {
42744315
var sources = [];
42754316
if (Array.isArray(arguments[0])) {
42764317
sources = arguments[0];
42774318
} else {
4278-
for(var i = 0, len = arguments.length; i < len; i++) { sources.push(arguments[i]); }
4319+
var len = arguments.length;
4320+
sources = new Array(len);
4321+
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
42794322
}
4280-
return new AnonymousObservable(function (observer) {
4281-
var pos = 0, subscription = new SerialDisposable(),
4282-
cancelable = immediateScheduler.scheduleRecursive(null, function (_, self) {
4283-
var current, d;
4284-
if (pos < sources.length) {
4285-
current = sources[pos++];
4286-
isPromise(current) && (current = observableFromPromise(current));
4287-
d = new SingleAssignmentDisposable();
4288-
subscription.setDisposable(d);
4289-
d.setDisposable(current.subscribe(observer.onNext.bind(observer), self, self));
4290-
} else {
4291-
observer.onCompleted();
4292-
}
4293-
});
4294-
return new BinaryDisposable(subscription, cancelable);
4295-
});
4323+
return new OnErrorResumeNextObservable(sources);
42964324
};
42974325

42984326
var SkipUntilObservable = (function(__super__) {

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3940,37 +3940,65 @@ var ObserveOnObservable = (function (__super__) {
39403940
return onErrorResumeNext([this, second]);
39413941
};
39423942

3943+
var OnErrorResumeNextObservable = (function(__super__) {
3944+
inherits(OnErrorResumeNextObservable, __super__);
3945+
function OnErrorResumeNextObservable(sources) {
3946+
this.sources = sources;
3947+
__super__.call(this);
3948+
}
3949+
3950+
function scheduleMethod(state, recurse) {
3951+
if (state.pos < state.sources.length) {
3952+
var current = state.sources[state.pos++];
3953+
isPromise(current) && (current = observableFromPromise(current));
3954+
var d = new SingleAssignmentDisposable();
3955+
state.subscription.setDisposable(d);
3956+
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
3957+
} else {
3958+
state.o.onCompleted();
3959+
}
3960+
}
3961+
3962+
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
3963+
var subscription = new SerialDisposable(),
3964+
state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
3965+
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
3966+
3967+
return new BinaryDisposable(subscription, cancellable);
3968+
};
3969+
3970+
return OnErrorResumeNextObservable;
3971+
}(ObservableBase));
3972+
3973+
var OnErrorResumeNextObserver = (function(__super__) {
3974+
inherits(OnErrorResumeNextObserver, __super__);
3975+
function OnErrorResumeNextObserver(state, recurse) {
3976+
this._state = state;
3977+
this._recurse = recurse;
3978+
__super__.call(this);
3979+
}
3980+
3981+
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
3982+
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
3983+
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
3984+
3985+
return OnErrorResumeNextObserver;
3986+
}(AbstractObserver));
3987+
39433988
/**
39443989
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
3945-
*
3946-
* @example
3947-
* 1 - res = Rx.Observable.onErrorResumeNext(xs, ys, zs);
3948-
* 1 - res = Rx.Observable.onErrorResumeNext([xs, ys, zs]);
39493990
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
39503991
*/
39513992
var onErrorResumeNext = Observable.onErrorResumeNext = function () {
39523993
var sources = [];
39533994
if (Array.isArray(arguments[0])) {
39543995
sources = arguments[0];
39553996
} else {
3956-
for(var i = 0, len = arguments.length; i < len; i++) { sources.push(arguments[i]); }
3997+
var len = arguments.length;
3998+
sources = new Array(len);
3999+
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
39574000
}
3958-
return new AnonymousObservable(function (observer) {
3959-
var pos = 0, subscription = new SerialDisposable(),
3960-
cancelable = immediateScheduler.scheduleRecursive(null, function (_, self) {
3961-
var current, d;
3962-
if (pos < sources.length) {
3963-
current = sources[pos++];
3964-
isPromise(current) && (current = observableFromPromise(current));
3965-
d = new SingleAssignmentDisposable();
3966-
subscription.setDisposable(d);
3967-
d.setDisposable(current.subscribe(observer.onNext.bind(observer), self, self));
3968-
} else {
3969-
observer.onCompleted();
3970-
}
3971-
});
3972-
return new BinaryDisposable(subscription, cancelable);
3973-
});
4001+
return new OnErrorResumeNextObservable(sources);
39744002
};
39754003

39764004
var SkipUntilObservable = (function(__super__) {

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.js

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4262,37 +4262,65 @@ var ObserveOnObservable = (function (__super__) {
42624262
return onErrorResumeNext([this, second]);
42634263
};
42644264

4265+
var OnErrorResumeNextObservable = (function(__super__) {
4266+
inherits(OnErrorResumeNextObservable, __super__);
4267+
function OnErrorResumeNextObservable(sources) {
4268+
this.sources = sources;
4269+
__super__.call(this);
4270+
}
4271+
4272+
function scheduleMethod(state, recurse) {
4273+
if (state.pos < state.sources.length) {
4274+
var current = state.sources[state.pos++];
4275+
isPromise(current) && (current = observableFromPromise(current));
4276+
var d = new SingleAssignmentDisposable();
4277+
state.subscription.setDisposable(d);
4278+
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
4279+
} else {
4280+
state.o.onCompleted();
4281+
}
4282+
}
4283+
4284+
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
4285+
var subscription = new SerialDisposable(),
4286+
state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
4287+
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
4288+
4289+
return new BinaryDisposable(subscription, cancellable);
4290+
};
4291+
4292+
return OnErrorResumeNextObservable;
4293+
}(ObservableBase));
4294+
4295+
var OnErrorResumeNextObserver = (function(__super__) {
4296+
inherits(OnErrorResumeNextObserver, __super__);
4297+
function OnErrorResumeNextObserver(state, recurse) {
4298+
this._state = state;
4299+
this._recurse = recurse;
4300+
__super__.call(this);
4301+
}
4302+
4303+
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
4304+
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
4305+
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
4306+
4307+
return OnErrorResumeNextObserver;
4308+
}(AbstractObserver));
4309+
42654310
/**
42664311
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
4267-
*
4268-
* @example
4269-
* 1 - res = Rx.Observable.onErrorResumeNext(xs, ys, zs);
4270-
* 1 - res = Rx.Observable.onErrorResumeNext([xs, ys, zs]);
42714312
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
42724313
*/
42734314
var onErrorResumeNext = Observable.onErrorResumeNext = function () {
42744315
var sources = [];
42754316
if (Array.isArray(arguments[0])) {
42764317
sources = arguments[0];
42774318
} else {
4278-
for(var i = 0, len = arguments.length; i < len; i++) { sources.push(arguments[i]); }
4319+
var len = arguments.length;
4320+
sources = new Array(len);
4321+
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
42794322
}
4280-
return new AnonymousObservable(function (observer) {
4281-
var pos = 0, subscription = new SerialDisposable(),
4282-
cancelable = immediateScheduler.scheduleRecursive(null, function (_, self) {
4283-
var current, d;
4284-
if (pos < sources.length) {
4285-
current = sources[pos++];
4286-
isPromise(current) && (current = observableFromPromise(current));
4287-
d = new SingleAssignmentDisposable();
4288-
subscription.setDisposable(d);
4289-
d.setDisposable(current.subscribe(observer.onNext.bind(observer), self, self));
4290-
} else {
4291-
observer.onCompleted();
4292-
}
4293-
});
4294-
return new BinaryDisposable(subscription, cancelable);
4295-
});
4323+
return new OnErrorResumeNextObservable(sources);
42964324
};
42974325

42984326
var SkipUntilObservable = (function(__super__) {

dist/rx.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.min.js

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.js

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3894,37 +3894,65 @@ var ObserveOnObservable = (function (__super__) {
38943894
return onErrorResumeNext([this, second]);
38953895
};
38963896

3897+
var OnErrorResumeNextObservable = (function(__super__) {
3898+
inherits(OnErrorResumeNextObservable, __super__);
3899+
function OnErrorResumeNextObservable(sources) {
3900+
this.sources = sources;
3901+
__super__.call(this);
3902+
}
3903+
3904+
function scheduleMethod(state, recurse) {
3905+
if (state.pos < state.sources.length) {
3906+
var current = state.sources[state.pos++];
3907+
isPromise(current) && (current = observableFromPromise(current));
3908+
var d = new SingleAssignmentDisposable();
3909+
state.subscription.setDisposable(d);
3910+
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse)));
3911+
} else {
3912+
state.o.onCompleted();
3913+
}
3914+
}
3915+
3916+
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) {
3917+
var subscription = new SerialDisposable(),
3918+
state = {pos: 0, subscription: subscription, o: o, sources: this.sources },
3919+
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod);
3920+
3921+
return new BinaryDisposable(subscription, cancellable);
3922+
};
3923+
3924+
return OnErrorResumeNextObservable;
3925+
}(ObservableBase));
3926+
3927+
var OnErrorResumeNextObserver = (function(__super__) {
3928+
inherits(OnErrorResumeNextObserver, __super__);
3929+
function OnErrorResumeNextObserver(state, recurse) {
3930+
this._state = state;
3931+
this._recurse = recurse;
3932+
__super__.call(this);
3933+
}
3934+
3935+
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); };
3936+
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); };
3937+
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); };
3938+
3939+
return OnErrorResumeNextObserver;
3940+
}(AbstractObserver));
3941+
38973942
/**
38983943
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence.
3899-
*
3900-
* @example
3901-
* 1 - res = Rx.Observable.onErrorResumeNext(xs, ys, zs);
3902-
* 1 - res = Rx.Observable.onErrorResumeNext([xs, ys, zs]);
39033944
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally.
39043945
*/
39053946
var onErrorResumeNext = Observable.onErrorResumeNext = function () {
39063947
var sources = [];
39073948
if (Array.isArray(arguments[0])) {
39083949
sources = arguments[0];
39093950
} else {
3910-
for(var i = 0, len = arguments.length; i < len; i++) { sources.push(arguments[i]); }
3951+
var len = arguments.length;
3952+
sources = new Array(len);
3953+
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
39113954
}
3912-
return new AnonymousObservable(function (observer) {
3913-
var pos = 0, subscription = new SerialDisposable(),
3914-
cancelable = immediateScheduler.scheduleRecursive(null, function (_, self) {
3915-
var current, d;
3916-
if (pos < sources.length) {
3917-
current = sources[pos++];
3918-
isPromise(current) && (current = observableFromPromise(current));
3919-
d = new SingleAssignmentDisposable();
3920-
subscription.setDisposable(d);
3921-
d.setDisposable(current.subscribe(observer.onNext.bind(observer), self, self));
3922-
} else {
3923-
observer.onCompleted();
3924-
}
3925-
});
3926-
return new BinaryDisposable(subscription, cancelable);
3927-
});
3955+
return new OnErrorResumeNextObservable(sources);
39283956
};
39293957

39303958
var SkipUntilObservable = (function(__super__) {

0 commit comments

Comments
 (0)