Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit a147d64

Browse files
Merge pull request #608 from paulpdaniels/controlled-completed-fix
Controlled completed fix
2 parents 970aa57 + ef7fea0 commit a147d64

2 files changed

Lines changed: 145 additions & 20 deletions

File tree

src/core/backpressure/controlled.js

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,29 @@
4141
this.error = null;
4242
this.hasFailed = false;
4343
this.hasCompleted = false;
44-
this.controlledDisposable = disposableEmpty;
4544
}
4645

4746
addProperties(ControlledSubject.prototype, Observer, {
4847
onCompleted: function () {
4948
this.hasCompleted = true;
50-
(!this.enableQueue || this.queue.length === 0) && this.subject.onCompleted();
49+
if (!this.enableQueue || this.queue.length === 0)
50+
this.subject.onCompleted();
51+
else
52+
this.queue.push(Rx.Notification.createOnCompleted());
5153
},
5254
onError: function (error) {
5355
this.hasFailed = true;
5456
this.error = error;
55-
(!this.enableQueue || this.queue.length === 0) && this.subject.onError(error);
57+
if (!this.enableQueue || this.queue.length === 0)
58+
this.subject.onError(error);
59+
else
60+
this.queue.push(Rx.Notification.createOnError(error));
5661
},
5762
onNext: function (value) {
5863
var hasRequested = false;
5964

6065
if (this.requestedCount === 0) {
61-
this.enableQueue && this.queue.push(value);
66+
this.enableQueue && this.queue.push(Rx.Notification.createOnNext(value));
6267
} else {
6368
(this.requestedCount !== -1 && this.requestedCount-- === 0) && this.disposeCurrentRequest();
6469
hasRequested = true;
@@ -67,25 +72,23 @@
6772
},
6873
_processRequest: function (numberOfItems) {
6974
if (this.enableQueue) {
70-
while (this.queue.length >= numberOfItems && numberOfItems > 0) {
71-
this.subject.onNext(this.queue.shift());
72-
numberOfItems--;
75+
while ((this.queue.length >= numberOfItems && numberOfItems > 0) ||
76+
(this.queue.length > 0 && this.queue[0].kind !== 'N')) {
77+
var first = this.queue.shift();
78+
first.accept(this.subject);
79+
if (first.kind === 'N') numberOfItems--;
80+
else { this.disposeCurrentRequest(); this.queue = []; }
7381
}
7482

75-
return this.queue.length !== 0 ?
76-
{ numberOfItems: numberOfItems, returnValue: true } :
77-
{ numberOfItems: numberOfItems, returnValue: false };
83+
return { numberOfItems : numberOfItems, returnValue: this.queue.length !== 0};
7884
}
7985

80-
if (this.hasFailed) {
81-
this.subject.onError(this.error);
82-
this.controlledDisposable.dispose();
83-
this.controlledDisposable = disposableEmpty;
84-
} else if (this.hasCompleted) {
85-
this.subject.onCompleted();
86-
this.controlledDisposable.dispose();
87-
this.controlledDisposable = disposableEmpty;
88-
}
86+
//TODO I don't think this is ever necessary, since termination of a sequence without a queue occurs in the onCompletion or onError function
87+
//if (this.hasFailed) {
88+
// this.subject.onError(this.error);
89+
//} else if (this.hasCompleted) {
90+
// this.subject.onCompleted();
91+
//}
8992

9093
return { numberOfItems: numberOfItems, returnValue: false };
9194
},
@@ -100,7 +103,7 @@
100103
self.requestedCount = 0;
101104
});
102105

103-
return this.requestedDisposable
106+
return this.requestedDisposable;
104107
} else {
105108
return disposableEmpty;
106109
}

tests/observable/controlled.js

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,125 @@ test('controlled gets two sets of values', function () {
9595
onCompleted(500)
9696
);
9797
});
98+
99+
test('controlled fires on completed', function(){
100+
101+
var scheduler = new TestScheduler();
102+
var results = scheduler.createObserver();
103+
104+
var source = Rx.Observable.range(1, 2).controlled();
105+
106+
scheduler.scheduleAbsolute(200, function(){
107+
source.subscribe(results);
108+
});
109+
110+
scheduler.scheduleAbsolute(300, function () {
111+
source.request(3);
112+
});
113+
114+
scheduler.start();
115+
116+
results.messages.assertEqual(
117+
onNext(300, 1),
118+
onNext(300, 2),
119+
onCompleted(300)
120+
);
121+
});
122+
123+
test('controlled cancels inflight request', function(){
124+
125+
var scheduler = new TestScheduler();
126+
127+
var source = scheduler.createHotObservable(
128+
onNext(400, 2),
129+
onNext(410, 3),
130+
onNext(420, 4)
131+
).controlled();
132+
133+
var results = scheduler.createObserver();
134+
135+
scheduler.scheduleAbsolute(200, function(){
136+
source.request(3);
137+
});
138+
139+
scheduler.scheduleAbsolute(200, function(){
140+
source.request(2);
141+
});
142+
143+
scheduler.scheduleAbsolute(300, function(){
144+
source.subscribe(results);
145+
});
146+
147+
scheduler.advanceBy(420);
148+
149+
150+
results.messages.assertEqual(
151+
onNext(400, 2),
152+
onNext(410, 3)
153+
);
154+
155+
156+
});
157+
158+
test('controlled fires onError', function(){
159+
160+
var scheduler = new TestScheduler();
161+
var results = scheduler.createObserver();
162+
163+
var error = new Error("expected");
164+
var source = Rx.Observable.range(1, 2, scheduler)
165+
.concat(Rx.Observable.throw(error, scheduler))
166+
.controlled();
167+
168+
scheduler.scheduleAbsolute(200, function(){
169+
source.subscribe(results);
170+
});
171+
172+
173+
scheduler.scheduleAbsolute(300, function () {
174+
source.request(3);
175+
});
176+
177+
scheduler.start();
178+
179+
results.messages.assertEqual(
180+
onNext(300, 1),
181+
onNext(300, 2),
182+
onError(300, error)
183+
);
184+
185+
});
186+
187+
test('controlled drops messages with queue disabled', function(){
188+
189+
var scheduler = new TestScheduler();
190+
191+
var source = scheduler.createHotObservable(
192+
onNext(400, 1),
193+
onNext(410, 2),
194+
onNext(420, 3),
195+
onNext(430, 4),
196+
onCompleted(500)
197+
).controlled(false);
198+
199+
var results = scheduler.createObserver();
200+
201+
202+
scheduler.scheduleAbsolute(415, function(){
203+
source.request(2);
204+
});
205+
206+
scheduler.scheduleAbsolute(200, function(){
207+
source.subscribe(results);
208+
});
209+
210+
scheduler.start();
211+
212+
results.messages.assertEqual(
213+
onNext(420, 3),
214+
onNext(430, 4),
215+
onCompleted(500)
216+
);
217+
218+
219+
});

0 commit comments

Comments
 (0)