Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 296d9ef

Browse files
Merge pull request #660 from paulpdaniels/controlled-async-fix
Fixed the controlled operator recursion issue by making requests async
2 parents 4457c51 + aedd6a0 commit 296d9ef

File tree

3 files changed

+90
-24
lines changed

3 files changed

+90
-24
lines changed

src/core/backpressure/controlled.js

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
return this.source.subscribe(observer);
77
}
88

9-
function ControlledObservable (source, enableQueue) {
9+
function ControlledObservable (source, enableQueue, scheduler) {
1010
__super__.call(this, subscribe, source);
11-
this.subject = new ControlledSubject(enableQueue);
11+
this.subject = new ControlledSubject(enableQueue, scheduler);
1212
this.source = source.multicast(this.subject).refCount();
1313
}
1414

@@ -29,7 +29,7 @@
2929

3030
inherits(ControlledSubject, __super__);
3131

32-
function ControlledSubject(enableQueue) {
32+
function ControlledSubject(enableQueue, scheduler) {
3333
enableQueue == null && (enableQueue = true);
3434

3535
__super__.call(this, subscribe);
@@ -41,6 +41,7 @@
4141
this.error = null;
4242
this.hasFailed = false;
4343
this.hasCompleted = false;
44+
this.scheduler = scheduler || Rx.Scheduler['currentThread'];
4445
}
4546

4647
addProperties(ControlledSubject.prototype, Observer, {
@@ -83,30 +84,28 @@
8384
return { numberOfItems : numberOfItems, returnValue: this.queue.length !== 0};
8485
}
8586

86-
//TODO I don't think this is ever necessary, since termination of a sequence without a queue occurs in the onCompletion or onError function
87-
//if (this.hasFailed) {
88-
// this.subject.onError(this.error);
89-
//} else if (this.hasCompleted) {
90-
// this.subject.onCompleted();
91-
//}
92-
9387
return { numberOfItems: numberOfItems, returnValue: false };
9488
},
9589
request: function (number) {
9690
this.disposeCurrentRequest();
97-
var self = this, r = this._processRequest(number);
91+
var self = this; //r = this._processRequest(number);
92+
93+
this.requestedDisposable = this.scheduler.scheduleWithState(number,
94+
function(s, i){
95+
var r = self._processRequest(i);
96+
var remaining = r.numberOfItems;
97+
if (!r.returnValue) {
98+
self.requestedCount = remaining;
99+
self.requestedDisposable = disposableCreate(function(){
100+
self.requestedCount = 0;
101+
});
102+
}
98103

99-
var number = r.numberOfItems;
100-
if (!r.returnValue) {
101-
this.requestedCount = number;
102-
this.requestedDisposable = disposableCreate(function () {
103-
self.requestedCount = 0;
104-
});
105104

106-
return this.requestedDisposable;
107-
} else {
108-
return disposableEmpty;
109-
}
105+
});
106+
107+
return this.requestedDisposable;
108+
110109
},
111110
disposeCurrentRequest: function () {
112111
this.requestedDisposable.dispose();
@@ -122,10 +121,17 @@
122121
* @example
123122
* var source = Rx.Observable.interval(100).controlled();
124123
* source.request(3); // Reads 3 values
125-
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
124+
* @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
125+
* @param {Scheduler} scheduler determines how the requests will be scheduled
126126
* @returns {Observable} The observable sequence which is paused based upon the pauser.
127127
*/
128-
observableProto.controlled = function (enableQueue) {
128+
observableProto.controlled = function (enableQueue, scheduler) {
129+
130+
if (enableQueue && isScheduler(enableQueue)) {
131+
scheduler = enableQueue;
132+
enableQueue = true;
133+
}
134+
129135
if (enableQueue == null) { enableQueue = true; }
130-
return new ControlledObservable(this, enableQueue);
136+
return new ControlledObservable(this, enableQueue, scheduler);
131137
};

src/core/headers/backpressureheader.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@
1313
timeoutScheduler = Rx.Scheduler.timeout,
1414
currentThreadScheduler = Rx.Scheduler.currentThread,
1515
identity = Rx.helpers.identity,
16+
//TODO Get some consistency about where this is declared
17+
isScheduler = Rx.helpers.isScheduler || Rx.Scheduler.isScheduler,
1618
checkDisposed = Rx.Disposable.checkDisposed;

tests/observable/controlled.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,61 @@ test('controlled drops messages with queue disabled', function(){
217217

218218

219219
});
220+
221+
test('controlled requests are scheduled', function() {
222+
var scheduler = new TestScheduler();
223+
var results = scheduler.createObserver();
224+
225+
var xs = scheduler
226+
.createHotObservable(
227+
onNext(210, 0),
228+
onNext(220, 1),
229+
onNext(230, 2),
230+
onNext(240, 3),
231+
onNext(250, 4),
232+
onNext(260, 5),
233+
onNext(270, 6),
234+
onCompleted(280)
235+
236+
);
237+
var source = xs.controlled();
238+
239+
// process one event at a time
240+
scheduler.scheduleAbsolute(200, function() {
241+
var subscription =
242+
source.subscribe(
243+
function (x) {
244+
// alternate between immediate and delayed request(1), causes hanging
245+
if (x % 2) {
246+
//Immediate
247+
source.request(1); // request next
248+
} else {
249+
//Delayed
250+
scheduler.schedule(function () {
251+
source.request(1); // request next
252+
});
253+
}
254+
results.onNext(x);
255+
},
256+
results.onError.bind(results),
257+
results.onCompleted.bind(results)
258+
);
259+
});
260+
261+
scheduler.scheduleAbsolute(300, function() {
262+
source.request(1); // start by requesting first item
263+
});
264+
265+
scheduler.start();
266+
267+
results.messages.assertEqual(
268+
onNext(300, 0),
269+
onNext(301, 1),
270+
onNext(301, 2),
271+
onNext(302, 3),
272+
onNext(302, 4),
273+
onNext(303, 5),
274+
onNext(303, 6),
275+
onCompleted(303)
276+
);
277+
});

0 commit comments

Comments
 (0)