Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 6b57fff

Browse files
committed
* Cleaned out old code
* Added more unit tests
1 parent 700e797 commit 6b57fff

2 files changed

Lines changed: 100 additions & 10 deletions

File tree

src/core/backpressure/controlled.js

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
this.subject.onCompleted();
5151
else
5252
this.queue.push(Rx.Notification.createOnCompleted());
53-
//(!this.enableQueue || this.queue.length === 0) && this.subject.onCompleted();
5453
},
5554
onError: function (error) {
5655
this.hasFailed = true;
@@ -59,7 +58,6 @@
5958
this.subject.onError(error);
6059
else
6160
this.queue.push(Rx.Notification.createOnError(error));
62-
//(!this.enableQueue || this.queue.length === 0) && this.subject.onError(error);
6361
},
6462
onNext: function (value) {
6563
var hasRequested = false;
@@ -74,12 +72,8 @@
7472
},
7573
_processRequest: function (numberOfItems) {
7674
if (this.enableQueue) {
77-
//while (this.queue.length >= numberOfItems && numberOfItems > 0) {
78-
// this.subject.onNext(this.queue.shift());
79-
// numberOfItems--;
80-
//}
81-
82-
while ((this.queue.length > 0 && this.queue[0].kind !== 'N') || (this.queue.length >= numberOfItems && numberOfItems > 0)) {
75+
while ((this.queue.length >= numberOfItems && numberOfItems > 0) ||
76+
(this.queue.length > 0 && this.queue[0].kind !== 'N')) {
8377
var first = this.queue.shift();
8478
first.accept(this.subject);
8579
if (first.kind === 'N') numberOfItems--;
@@ -110,7 +104,7 @@
110104
self.requestedCount = 0;
111105
});
112106

113-
return this.requestedDisposable
107+
return this.requestedDisposable;
114108
} else {
115109
return disposableEmpty;
116110
}

tests/observable/controlled.js

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ test('controlled fires on completed', function(){
107107
source.subscribe(results);
108108
});
109109

110-
111110
scheduler.scheduleAbsolute(300, function () {
112111
source.request(3);
113112
});
@@ -119,5 +118,102 @@ test('controlled fires on completed', function(){
119118
onNext(300, 2),
120119
onCompleted(300)
121120
);
121+
});
122+
123+
test('controlled cancels inflight request', function(){
124+
125+
var scheduler = new TestScheduler();
126+
127+
var source = scheduler.createHotObservable(
128+
onNext(400, 2),
129+
onNext(410, 3),
130+
onNext(420, 4)
131+
).controlled();
132+
133+
var results = scheduler.createObserver();
134+
135+
scheduler.scheduleAbsolute(200, function(){
136+
source.request(3);
137+
});
138+
139+
scheduler.scheduleAbsolute(200, function(){
140+
source.request(2);
141+
});
142+
143+
scheduler.scheduleAbsolute(300, function(){
144+
source.subscribe(results);
145+
});
146+
147+
scheduler.advanceBy(420);
148+
149+
150+
results.messages.assertEqual(
151+
onNext(400, 2),
152+
onNext(410, 3)
153+
);
154+
122155

123156
});
157+
158+
test('controlled fires onError', function(){
159+
160+
var scheduler = new TestScheduler();
161+
var results = scheduler.createObserver();
162+
163+
var error = new Error("expected");
164+
var source = Rx.Observable.range(1, 2, scheduler)
165+
.concat(Rx.Observable.throw(error, scheduler))
166+
.controlled();
167+
168+
scheduler.scheduleAbsolute(200, function(){
169+
source.subscribe(results);
170+
});
171+
172+
173+
scheduler.scheduleAbsolute(300, function () {
174+
source.request(3);
175+
});
176+
177+
scheduler.start();
178+
179+
results.messages.assertEqual(
180+
onNext(300, 1),
181+
onNext(300, 2),
182+
onError(300, error)
183+
);
184+
185+
});
186+
187+
test('controlled drops messages with queue disabled', function(){
188+
189+
var scheduler = new TestScheduler();
190+
191+
var source = scheduler.createHotObservable(
192+
onNext(400, 1),
193+
onNext(410, 2),
194+
onNext(420, 3),
195+
onNext(430, 4),
196+
onCompleted(500)
197+
).controlled(false);
198+
199+
var results = scheduler.createObserver();
200+
201+
202+
scheduler.scheduleAbsolute(415, function(){
203+
source.request(2);
204+
});
205+
206+
scheduler.scheduleAbsolute(200, function(){
207+
source.subscribe(results);
208+
});
209+
210+
scheduler.start();
211+
212+
results.messages.assertEqual(
213+
onNext(420, 3),
214+
onNext(430, 4),
215+
onCompleted(500)
216+
);
217+
218+
219+
});

0 commit comments

Comments
 (0)