Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 700e797

Browse files
committed
* Switched to notifications for onComplete and onError
* Removed extraneous disposable * Added new unit tests
1 parent 60afd46 commit 700e797

3 files changed

Lines changed: 49 additions & 12 deletions

File tree

Gruntfile.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1674,7 +1674,8 @@ module.exports = function (grunt) {
16741674
}
16751675
},
16761676
qunit: {
1677-
all: ['tests/*.html']
1677+
all: ['tests/*.html'],
1678+
controlled : ['tests/rx.backpressure.html']
16781679
},
16791680
jshint: {
16801681
all: [

src/core/backpressure/controlled.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,31 @@
4141
this.error = null;
4242
this.hasFailed = false;
4343
this.hasCompleted = false;
44-
this.controlledDisposable = disposableEmpty;
4544
}
4645

4746
addProperties(ControlledSubject.prototype, Observer, {
4847
onCompleted: function () {
4948
this.hasCompleted = true;
50-
(!this.enableQueue || this.queue.length === 0) && this.subject.onCompleted();
49+
if (!this.enableQueue || this.queue.length === 0)
50+
this.subject.onCompleted();
51+
else
52+
this.queue.push(Rx.Notification.createOnCompleted());
53+
//(!this.enableQueue || this.queue.length === 0) && this.subject.onCompleted();
5154
},
5255
onError: function (error) {
5356
this.hasFailed = true;
5457
this.error = error;
55-
(!this.enableQueue || this.queue.length === 0) && this.subject.onError(error);
58+
if (!this.enableQueue || this.queue.length === 0)
59+
this.subject.onError(error);
60+
else
61+
this.queue.push(Rx.Notification.createOnError(error));
62+
//(!this.enableQueue || this.queue.length === 0) && this.subject.onError(error);
5663
},
5764
onNext: function (value) {
5865
var hasRequested = false;
5966

6067
if (this.requestedCount === 0) {
61-
this.enableQueue && this.queue.push(value);
68+
this.enableQueue && this.queue.push(Rx.Notification.createOnNext(value));
6269
} else {
6370
(this.requestedCount !== -1 && this.requestedCount-- === 0) && this.disposeCurrentRequest();
6471
hasRequested = true;
@@ -67,9 +74,16 @@
6774
},
6875
_processRequest: function (numberOfItems) {
6976
if (this.enableQueue) {
70-
while (this.queue.length >= numberOfItems && numberOfItems > 0) {
71-
this.subject.onNext(this.queue.shift());
72-
numberOfItems--;
77+
//while (this.queue.length >= numberOfItems && numberOfItems > 0) {
78+
// this.subject.onNext(this.queue.shift());
79+
// numberOfItems--;
80+
//}
81+
82+
while ((this.queue.length > 0 && this.queue[0].kind !== 'N') || (this.queue.length >= numberOfItems && numberOfItems > 0)) {
83+
var first = this.queue.shift();
84+
first.accept(this.subject);
85+
if (first.kind === 'N') numberOfItems--;
86+
else { this.disposeCurrentRequest(); this.queue = []; }
7387
}
7488

7589
return this.queue.length !== 0 ?
@@ -79,12 +93,8 @@
7993

8094
if (this.hasFailed) {
8195
this.subject.onError(this.error);
82-
this.controlledDisposable.dispose();
83-
this.controlledDisposable = disposableEmpty;
8496
} else if (this.hasCompleted) {
8597
this.subject.onCompleted();
86-
this.controlledDisposable.dispose();
87-
this.controlledDisposable = disposableEmpty;
8898
}
8999

90100
return { numberOfItems: numberOfItems, returnValue: false };

tests/observable/controlled.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,29 @@ test('controlled gets two sets of values', function () {
9595
onCompleted(500)
9696
);
9797
});
98+
99+
test('controlled fires on completed', function(){
100+
101+
var scheduler = new TestScheduler();
102+
var results = scheduler.createObserver();
103+
104+
var source = Rx.Observable.range(1, 2).controlled();
105+
106+
scheduler.scheduleAbsolute(200, function(){
107+
source.subscribe(results);
108+
});
109+
110+
111+
scheduler.scheduleAbsolute(300, function () {
112+
source.request(3);
113+
});
114+
115+
scheduler.start();
116+
117+
results.messages.assertEqual(
118+
onNext(300, 1),
119+
onNext(300, 2),
120+
onCompleted(300)
121+
);
122+
123+
});

0 commit comments

Comments
 (0)