Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 70ba255

Browse files
Adding observer to concat
1 parent a25c845 commit 70ba255

25 files changed

Lines changed: 320 additions & 87 deletions

dist/rx.all.compat.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2266,18 +2266,43 @@
22662266

22672267
var d = new SingleAssignmentDisposable();
22682268
subscription.setDisposable(d);
2269-
d.setDisposable(currentValue.subscribe(
2270-
function(x) { o.onNext(x); },
2271-
function(err) { o.onError(err); },
2272-
function () { self(e); })
2273-
);
2269+
d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
22742270
});
22752271

22762272
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
22772273
isDisposed = true;
22782274
}));
22792275
};
22802276

2277+
function InnerObserver(o, s, e) {
2278+
this.o = o;
2279+
this.s = s;
2280+
this.e = e;
2281+
this.isStopped = false;
2282+
}
2283+
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
2284+
InnerObserver.prototype.onError = function (err) {
2285+
if (!this.isStopped) {
2286+
this.isStopped = true;
2287+
this.o.onError(err);
2288+
}
2289+
};
2290+
InnerObserver.prototype.onCompleted = function () {
2291+
if (!this.isStopped) {
2292+
this.isStopped = true;
2293+
this.s(this.e);
2294+
}
2295+
};
2296+
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
2297+
InnerObserver.prototype.fail = function (err) {
2298+
if (!this.isStopped) {
2299+
this.isStopped = true;
2300+
this.o.onError(err);
2301+
return true;
2302+
}
2303+
return false;
2304+
};
2305+
22812306
return ConcatObservable;
22822307
}(ObservableBase));
22832308

@@ -2338,7 +2363,8 @@
23382363
subscription = new SerialDisposable();
23392364
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
23402365
if (isDisposed) { return; }
2341-
var currentItem = e.next();
2366+
var currentItem = tryCatch(e.next).call(e);
2367+
if (currentItem === errorObj) { return o.onError(currentItem.e); }
23422368

23432369
if (currentItem.done) {
23442370
if (lastException) {

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 3 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,18 +2075,43 @@
20752075

20762076
var d = new SingleAssignmentDisposable();
20772077
subscription.setDisposable(d);
2078-
d.setDisposable(currentValue.subscribe(
2079-
function(x) { o.onNext(x); },
2080-
function(err) { o.onError(err); },
2081-
function () { self(e); })
2082-
);
2078+
d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
20832079
});
20842080

20852081
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
20862082
isDisposed = true;
20872083
}));
20882084
};
20892085

2086+
function InnerObserver(o, s, e) {
2087+
this.o = o;
2088+
this.s = s;
2089+
this.e = e;
2090+
this.isStopped = false;
2091+
}
2092+
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
2093+
InnerObserver.prototype.onError = function (err) {
2094+
if (!this.isStopped) {
2095+
this.isStopped = true;
2096+
this.o.onError(err);
2097+
}
2098+
};
2099+
InnerObserver.prototype.onCompleted = function () {
2100+
if (!this.isStopped) {
2101+
this.isStopped = true;
2102+
this.s(this.e);
2103+
}
2104+
};
2105+
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
2106+
InnerObserver.prototype.fail = function (err) {
2107+
if (!this.isStopped) {
2108+
this.isStopped = true;
2109+
this.o.onError(err);
2110+
return true;
2111+
}
2112+
return false;
2113+
};
2114+
20902115
return ConcatObservable;
20912116
}(ObservableBase));
20922117

@@ -2147,7 +2172,8 @@
21472172
subscription = new SerialDisposable();
21482173
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
21492174
if (isDisposed) { return; }
2150-
var currentItem = e.next();
2175+
var currentItem = tryCatch(e.next).call(e);
2176+
if (currentItem === errorObj) { return o.onError(currentItem.e); }
21512177

21522178
if (currentItem.done) {
21532179
if (lastException) {

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2266,18 +2266,43 @@
22662266

22672267
var d = new SingleAssignmentDisposable();
22682268
subscription.setDisposable(d);
2269-
d.setDisposable(currentValue.subscribe(
2270-
function(x) { o.onNext(x); },
2271-
function(err) { o.onError(err); },
2272-
function () { self(e); })
2273-
);
2269+
d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
22742270
});
22752271

22762272
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
22772273
isDisposed = true;
22782274
}));
22792275
};
22802276

2277+
function InnerObserver(o, s, e) {
2278+
this.o = o;
2279+
this.s = s;
2280+
this.e = e;
2281+
this.isStopped = false;
2282+
}
2283+
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
2284+
InnerObserver.prototype.onError = function (err) {
2285+
if (!this.isStopped) {
2286+
this.isStopped = true;
2287+
this.o.onError(err);
2288+
}
2289+
};
2290+
InnerObserver.prototype.onCompleted = function () {
2291+
if (!this.isStopped) {
2292+
this.isStopped = true;
2293+
this.s(this.e);
2294+
}
2295+
};
2296+
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
2297+
InnerObserver.prototype.fail = function (err) {
2298+
if (!this.isStopped) {
2299+
this.isStopped = true;
2300+
this.o.onError(err);
2301+
return true;
2302+
}
2303+
return false;
2304+
};
2305+
22812306
return ConcatObservable;
22822307
}(ObservableBase));
22832308

@@ -2338,7 +2363,8 @@
23382363
subscription = new SerialDisposable();
23392364
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
23402365
if (isDisposed) { return; }
2341-
var currentItem = e.next();
2366+
var currentItem = tryCatch(e.next).call(e);
2367+
if (currentItem === errorObj) { return o.onError(currentItem.e); }
23422368

23432369
if (currentItem.done) {
23442370
if (lastException) {

dist/rx.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.compat.min.js

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,18 +2075,43 @@
20752075

20762076
var d = new SingleAssignmentDisposable();
20772077
subscription.setDisposable(d);
2078-
d.setDisposable(currentValue.subscribe(
2079-
function(x) { o.onNext(x); },
2080-
function(err) { o.onError(err); },
2081-
function () { self(e); })
2082-
);
2078+
d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
20832079
});
20842080

20852081
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
20862082
isDisposed = true;
20872083
}));
20882084
};
20892085

2086+
function InnerObserver(o, s, e) {
2087+
this.o = o;
2088+
this.s = s;
2089+
this.e = e;
2090+
this.isStopped = false;
2091+
}
2092+
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
2093+
InnerObserver.prototype.onError = function (err) {
2094+
if (!this.isStopped) {
2095+
this.isStopped = true;
2096+
this.o.onError(err);
2097+
}
2098+
};
2099+
InnerObserver.prototype.onCompleted = function () {
2100+
if (!this.isStopped) {
2101+
this.isStopped = true;
2102+
this.s(this.e);
2103+
}
2104+
};
2105+
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
2106+
InnerObserver.prototype.fail = function (err) {
2107+
if (!this.isStopped) {
2108+
this.isStopped = true;
2109+
this.o.onError(err);
2110+
return true;
2111+
}
2112+
return false;
2113+
};
2114+
20902115
return ConcatObservable;
20912116
}(ObservableBase));
20922117

@@ -2147,7 +2172,8 @@
21472172
subscription = new SerialDisposable();
21482173
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
21492174
if (isDisposed) { return; }
2150-
var currentItem = e.next();
2175+
var currentItem = tryCatch(e.next).call(e);
2176+
if (currentItem === errorObj) { return o.onError(currentItem.e); }
21512177

21522178
if (currentItem.done) {
21532179
if (lastException) {

0 commit comments

Comments
 (0)