|
6 | 6 | return this.source.subscribe(observer); |
7 | 7 | } |
8 | 8 |
|
9 | | - function ControlledObservable (source, enableQueue) { |
| 9 | + function ControlledObservable (source, enableQueue, scheduler) { |
10 | 10 | __super__.call(this, subscribe, source); |
11 | | - this.subject = new ControlledSubject(enableQueue); |
| 11 | + this.subject = new ControlledSubject(enableQueue, scheduler); |
12 | 12 | this.source = source.multicast(this.subject).refCount(); |
13 | 13 | } |
14 | 14 |
|
|
29 | 29 |
|
30 | 30 | inherits(ControlledSubject, __super__); |
31 | 31 |
|
32 | | - function ControlledSubject(enableQueue) { |
| 32 | + function ControlledSubject(enableQueue, scheduler) { |
33 | 33 | enableQueue == null && (enableQueue = true); |
34 | 34 |
|
35 | 35 | __super__.call(this, subscribe); |
|
41 | 41 | this.error = null; |
42 | 42 | this.hasFailed = false; |
43 | 43 | this.hasCompleted = false; |
| 44 | + this.scheduler = scheduler || Rx.Scheduler['currentThread']; |
44 | 45 | } |
45 | 46 |
|
46 | 47 | addProperties(ControlledSubject.prototype, Observer, { |
|
83 | 84 | return { numberOfItems : numberOfItems, returnValue: this.queue.length !== 0}; |
84 | 85 | } |
85 | 86 |
|
86 | | - //TODO I don't think this is ever necessary, since termination of a sequence without a queue occurs in the onCompletion or onError function |
87 | | - //if (this.hasFailed) { |
88 | | - // this.subject.onError(this.error); |
89 | | - //} else if (this.hasCompleted) { |
90 | | - // this.subject.onCompleted(); |
91 | | - //} |
92 | | - |
93 | 87 | return { numberOfItems: numberOfItems, returnValue: false }; |
94 | 88 | }, |
95 | 89 | request: function (number) { |
96 | 90 | this.disposeCurrentRequest(); |
97 | | - var self = this, r = this._processRequest(number); |
| 91 | + var self = this; //r = this._processRequest(number); |
| 92 | + |
| 93 | + this.requestedDisposable = this.scheduler.scheduleWithState(number, |
| 94 | + function(s, i){ |
| 95 | + var r = self._processRequest(i); |
| 96 | + var remaining = r.numberOfItems; |
| 97 | + if (!r.returnValue) { |
| 98 | + self.requestedCount = remaining; |
| 99 | + self.requestedDisposable = disposableCreate(function(){ |
| 100 | + self.requestedCount = 0; |
| 101 | + }); |
| 102 | + } |
98 | 103 |
|
99 | | - var number = r.numberOfItems; |
100 | | - if (!r.returnValue) { |
101 | | - this.requestedCount = number; |
102 | | - this.requestedDisposable = disposableCreate(function () { |
103 | | - self.requestedCount = 0; |
104 | | - }); |
105 | 104 |
|
106 | | - return this.requestedDisposable; |
107 | | - } else { |
108 | | - return disposableEmpty; |
109 | | - } |
| 105 | + }); |
| 106 | + |
| 107 | + return this.requestedDisposable; |
| 108 | + |
110 | 109 | }, |
111 | 110 | disposeCurrentRequest: function () { |
112 | 111 | this.requestedDisposable.dispose(); |
|
122 | 121 | * @example |
123 | 122 | * var source = Rx.Observable.interval(100).controlled(); |
124 | 123 | * source.request(3); // Reads 3 values |
125 | | - * @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
| 124 | + * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request |
| 125 | + * @param {Scheduler} scheduler determines how the requests will be scheduled |
126 | 126 | * @returns {Observable} The observable sequence which is paused based upon the pauser. |
127 | 127 | */ |
128 | | - observableProto.controlled = function (enableQueue) { |
| 128 | + observableProto.controlled = function (enableQueue, scheduler) { |
| 129 | + |
| 130 | + if (enableQueue && isScheduler(enableQueue)) { |
| 131 | + scheduler = enableQueue; |
| 132 | + enableQueue = true; |
| 133 | + } |
| 134 | + |
129 | 135 | if (enableQueue == null) { enableQueue = true; } |
130 | | - return new ControlledObservable(this, enableQueue); |
| 136 | + return new ControlledObservable(this, enableQueue, scheduler); |
131 | 137 | }; |
0 commit comments