Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 90857e7

Browse files
Fixing mergedelayerror
1 parent 6a70c93 commit 90857e7

38 files changed

Lines changed: 943 additions & 629 deletions

dist/rx.all.compat.js

Lines changed: 95 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,14 +1497,16 @@
14971497
}(Scheduler.prototype));
14981498

14991499
var SchedulePeriodicRecursive = Rx.internals.SchedulePeriodicRecursive = (function () {
1500-
function tick(command, recurse) {
1501-
recurse(0, this._period);
1502-
try {
1503-
this._state = this._action(this._state);
1504-
} catch (e) {
1505-
this._cancel.dispose();
1506-
throw e;
1507-
}
1500+
function createTick(self) {
1501+
return function tick(command, recurse) {
1502+
recurse(0, self._period);
1503+
var state = tryCatch(self._action)(self._state);
1504+
if (state === errorObj) {
1505+
self._cancel.dispose();
1506+
thrower(state.e);
1507+
}
1508+
self._state = state;
1509+
};
15081510
}
15091511

15101512
function SchedulePeriodicRecursive(scheduler, state, period, action) {
@@ -1517,7 +1519,7 @@
15171519
SchedulePeriodicRecursive.prototype.start = function () {
15181520
var d = new SingleAssignmentDisposable();
15191521
this._cancel = d;
1520-
d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, tick.bind(this)));
1522+
d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, createTick(this)));
15211523

15221524
return d;
15231525
};
@@ -4142,8 +4144,89 @@ var ObserveOnObservable = (function (__super__) {
41424144
this.message = 'This contains multiple errors. Check the innerErrors';
41434145
Error.call(this);
41444146
};
4145-
CompositeError.prototype = Error.prototype;
4146-
CompositeError.prototype.name = 'NotImplementedError';
4147+
CompositeError.prototype = Object.create(Error.prototype);
4148+
CompositeError.prototype.name = 'CompositeError';
4149+
4150+
var MergeDelayErrorObservable = (function(__super__) {
4151+
inherits(MergeDelayErrorObservable, __super__);
4152+
function MergeDelayErrorObservable(source) {
4153+
this.source = source;
4154+
__super__.call(this);
4155+
}
4156+
4157+
MergeDelayErrorObservable.prototype.subscribeCore = function (o) {
4158+
var group = new CompositeDisposable(),
4159+
m = new SingleAssignmentDisposable(),
4160+
state = { isStopped: false, errors: [], o: o };
4161+
4162+
group.add(m);
4163+
m.setDisposable(this.source.subscribe(new MergeDelayErrorObserver(group, state)));
4164+
4165+
return group;
4166+
};
4167+
4168+
return MergeDelayErrorObservable;
4169+
}(ObservableBase));
4170+
4171+
var MergeDelayErrorObserver = (function(__super__) {
4172+
inherits(MergeDelayErrorObserver, __super__);
4173+
function MergeDelayErrorObserver(group, state) {
4174+
this._group = group;
4175+
this._state = state;
4176+
__super__.call(this);
4177+
}
4178+
4179+
function setCompletion(o, errors) {
4180+
if (errors.length === 0) {
4181+
o.onCompleted();
4182+
} else if (errors.length === 1) {
4183+
o.onError(errors[0]);
4184+
} else {
4185+
o.onError(new CompositeError(errors));
4186+
}
4187+
}
4188+
4189+
MergeDelayErrorObserver.prototype.next = function (x) {
4190+
var inner = new SingleAssignmentDisposable();
4191+
this._group.add(inner);
4192+
4193+
// Check for promises support
4194+
isPromise(x) && (x = observableFromPromise(x));
4195+
inner.setDisposable(x.subscribe(new InnerObserver(inner, this._group, this._state)));
4196+
};
4197+
4198+
MergeDelayErrorObserver.prototype.error = function (e) {
4199+
this._state.errors.push(e);
4200+
this._state.isStopped = true;
4201+
this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
4202+
};
4203+
4204+
MergeDelayErrorObserver.prototype.completed = function () {
4205+
this._state.isStopped = true;
4206+
this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
4207+
};
4208+
4209+
inherits(InnerObserver, __super__);
4210+
function InnerObserver(inner, group, state) {
4211+
this._inner = inner;
4212+
this._group = group;
4213+
this._state = state;
4214+
__super__.call(this);
4215+
}
4216+
4217+
InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); };
4218+
InnerObserver.prototype.error = function (e) {
4219+
this._state.errors.push(e);
4220+
this._group.remove(this._inner);
4221+
this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
4222+
};
4223+
InnerObserver.prototype.completed = function () {
4224+
this._group.remove(this._inner);
4225+
this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
4226+
};
4227+
4228+
return MergeDelayErrorObserver;
4229+
}(AbstractObserver));
41474230

41484231
/**
41494232
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
@@ -4166,56 +4249,7 @@ var ObserveOnObservable = (function (__super__) {
41664249
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
41674250
}
41684251
var source = observableOf(null, args);
4169-
4170-
return new AnonymousObservable(function (o) {
4171-
var group = new CompositeDisposable(),
4172-
m = new SingleAssignmentDisposable(),
4173-
isStopped = false,
4174-
errors = [];
4175-
4176-
function setCompletion() {
4177-
if (errors.length === 0) {
4178-
o.onCompleted();
4179-
} else if (errors.length === 1) {
4180-
o.onError(errors[0]);
4181-
} else {
4182-
o.onError(new CompositeError(errors));
4183-
}
4184-
}
4185-
4186-
group.add(m);
4187-
4188-
m.setDisposable(source.subscribe(
4189-
function (innerSource) {
4190-
var innerSubscription = new SingleAssignmentDisposable();
4191-
group.add(innerSubscription);
4192-
4193-
// Check for promises support
4194-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
4195-
4196-
innerSubscription.setDisposable(innerSource.subscribe(
4197-
function (x) { o.onNext(x); },
4198-
function (e) {
4199-
errors.push(e);
4200-
group.remove(innerSubscription);
4201-
isStopped && group.length === 1 && setCompletion();
4202-
},
4203-
function () {
4204-
group.remove(innerSubscription);
4205-
isStopped && group.length === 1 && setCompletion();
4206-
}));
4207-
},
4208-
function (e) {
4209-
errors.push(e);
4210-
isStopped = true;
4211-
group.length === 1 && setCompletion();
4212-
},
4213-
function () {
4214-
isStopped = true;
4215-
group.length === 1 && setCompletion();
4216-
}));
4217-
return group;
4218-
});
4252+
return new MergeDelayErrorObservable(source);
42194253
};
42204254

42214255
/**

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 95 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,14 +1129,16 @@
11291129
}(Scheduler.prototype));
11301130

11311131
var SchedulePeriodicRecursive = Rx.internals.SchedulePeriodicRecursive = (function () {
1132-
function tick(command, recurse) {
1133-
recurse(0, this._period);
1134-
try {
1135-
this._state = this._action(this._state);
1136-
} catch (e) {
1137-
this._cancel.dispose();
1138-
throw e;
1139-
}
1132+
function createTick(self) {
1133+
return function tick(command, recurse) {
1134+
recurse(0, self._period);
1135+
var state = tryCatch(self._action)(self._state);
1136+
if (state === errorObj) {
1137+
self._cancel.dispose();
1138+
thrower(state.e);
1139+
}
1140+
self._state = state;
1141+
};
11401142
}
11411143

11421144
function SchedulePeriodicRecursive(scheduler, state, period, action) {
@@ -1149,7 +1151,7 @@
11491151
SchedulePeriodicRecursive.prototype.start = function () {
11501152
var d = new SingleAssignmentDisposable();
11511153
this._cancel = d;
1152-
d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, tick.bind(this)));
1154+
d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, createTick(this)));
11531155

11541156
return d;
11551157
};
@@ -3820,8 +3822,89 @@ var ObserveOnObservable = (function (__super__) {
38203822
this.message = 'This contains multiple errors. Check the innerErrors';
38213823
Error.call(this);
38223824
};
3823-
CompositeError.prototype = Error.prototype;
3824-
CompositeError.prototype.name = 'NotImplementedError';
3825+
CompositeError.prototype = Object.create(Error.prototype);
3826+
CompositeError.prototype.name = 'CompositeError';
3827+
3828+
var MergeDelayErrorObservable = (function(__super__) {
3829+
inherits(MergeDelayErrorObservable, __super__);
3830+
function MergeDelayErrorObservable(source) {
3831+
this.source = source;
3832+
__super__.call(this);
3833+
}
3834+
3835+
MergeDelayErrorObservable.prototype.subscribeCore = function (o) {
3836+
var group = new CompositeDisposable(),
3837+
m = new SingleAssignmentDisposable(),
3838+
state = { isStopped: false, errors: [], o: o };
3839+
3840+
group.add(m);
3841+
m.setDisposable(this.source.subscribe(new MergeDelayErrorObserver(group, state)));
3842+
3843+
return group;
3844+
};
3845+
3846+
return MergeDelayErrorObservable;
3847+
}(ObservableBase));
3848+
3849+
var MergeDelayErrorObserver = (function(__super__) {
3850+
inherits(MergeDelayErrorObserver, __super__);
3851+
function MergeDelayErrorObserver(group, state) {
3852+
this._group = group;
3853+
this._state = state;
3854+
__super__.call(this);
3855+
}
3856+
3857+
function setCompletion(o, errors) {
3858+
if (errors.length === 0) {
3859+
o.onCompleted();
3860+
} else if (errors.length === 1) {
3861+
o.onError(errors[0]);
3862+
} else {
3863+
o.onError(new CompositeError(errors));
3864+
}
3865+
}
3866+
3867+
MergeDelayErrorObserver.prototype.next = function (x) {
3868+
var inner = new SingleAssignmentDisposable();
3869+
this._group.add(inner);
3870+
3871+
// Check for promises support
3872+
isPromise(x) && (x = observableFromPromise(x));
3873+
inner.setDisposable(x.subscribe(new InnerObserver(inner, this._group, this._state)));
3874+
};
3875+
3876+
MergeDelayErrorObserver.prototype.error = function (e) {
3877+
this._state.errors.push(e);
3878+
this._state.isStopped = true;
3879+
this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
3880+
};
3881+
3882+
MergeDelayErrorObserver.prototype.completed = function () {
3883+
this._state.isStopped = true;
3884+
this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
3885+
};
3886+
3887+
inherits(InnerObserver, __super__);
3888+
function InnerObserver(inner, group, state) {
3889+
this._inner = inner;
3890+
this._group = group;
3891+
this._state = state;
3892+
__super__.call(this);
3893+
}
3894+
3895+
InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); };
3896+
InnerObserver.prototype.error = function (e) {
3897+
this._state.errors.push(e);
3898+
this._group.remove(this._inner);
3899+
this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
3900+
};
3901+
InnerObserver.prototype.completed = function () {
3902+
this._group.remove(this._inner);
3903+
this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
3904+
};
3905+
3906+
return MergeDelayErrorObserver;
3907+
}(AbstractObserver));
38253908

38263909
/**
38273910
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
@@ -3844,56 +3927,7 @@ var ObserveOnObservable = (function (__super__) {
38443927
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
38453928
}
38463929
var source = observableOf(null, args);
3847-
3848-
return new AnonymousObservable(function (o) {
3849-
var group = new CompositeDisposable(),
3850-
m = new SingleAssignmentDisposable(),
3851-
isStopped = false,
3852-
errors = [];
3853-
3854-
function setCompletion() {
3855-
if (errors.length === 0) {
3856-
o.onCompleted();
3857-
} else if (errors.length === 1) {
3858-
o.onError(errors[0]);
3859-
} else {
3860-
o.onError(new CompositeError(errors));
3861-
}
3862-
}
3863-
3864-
group.add(m);
3865-
3866-
m.setDisposable(source.subscribe(
3867-
function (innerSource) {
3868-
var innerSubscription = new SingleAssignmentDisposable();
3869-
group.add(innerSubscription);
3870-
3871-
// Check for promises support
3872-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
3873-
3874-
innerSubscription.setDisposable(innerSource.subscribe(
3875-
function (x) { o.onNext(x); },
3876-
function (e) {
3877-
errors.push(e);
3878-
group.remove(innerSubscription);
3879-
isStopped && group.length === 1 && setCompletion();
3880-
},
3881-
function () {
3882-
group.remove(innerSubscription);
3883-
isStopped && group.length === 1 && setCompletion();
3884-
}));
3885-
},
3886-
function (e) {
3887-
errors.push(e);
3888-
isStopped = true;
3889-
group.length === 1 && setCompletion();
3890-
},
3891-
function () {
3892-
isStopped = true;
3893-
group.length === 1 && setCompletion();
3894-
}));
3895-
return group;
3896-
});
3930+
return new MergeDelayErrorObservable(source);
38973931
};
38983932

38993933
/**

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)