Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 26cf279

Browse files
backpressure
1 parent 93e36a2 commit 26cf279

10 files changed

Lines changed: 527 additions & 6 deletions

File tree

src/core/backpressure/stopandwait.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
}
77

88
function scheduleMethod(s, self) {
9-
self.source.request(1);
9+
return self.source.request(1);
1010
}
1111

1212
StopAndWaitObservable.prototype._subscribe = function (o) {
@@ -38,15 +38,15 @@
3838
};
3939

4040
function innerScheduleMethod(s, self) {
41-
self.observable.source.request(1);
41+
return self.observable.source.request(1);
4242
}
4343

4444
StopAndWaitObserver.prototype.next = function (value) {
4545
this.observer.onNext(value);
4646
this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);
4747
};
4848

49-
StopAndWaitObservable.dispose = function () {
49+
StopAndWaitObserver.dispose = function () {
5050
this.observer = null;
5151
if (this.cancel) {
5252
this.cancel.dispose();

src/core/backpressure/windowed.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
}
88

99
function scheduleMethod(s, self) {
10-
self.source.request(self.windowSize);
10+
return self.source.request(self.windowSize);
1111
}
1212

1313
WindowedObservable.prototype._subscribe = function (o) {
@@ -40,7 +40,7 @@
4040
};
4141

4242
function innerScheduleMethod(s, self) {
43-
self.observable.source.request(self.observable.windowSize);
43+
return self.observable.source.request(self.observable.windowSize);
4444
}
4545

4646
WindowedObserver.prototype.next = function (value) {

src/modular/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Observable.addToObject({
4545
timer: require('./observable/timer'),
4646
toAsync: require('./observable/toasync'),
4747
when: require('./observable/when'),
48+
using: require('./observable/using'),
4849
zip: require('./observable/zip')
4950
});
5051

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
'use strict';
2+
3+
var Observable = require('../observable');
4+
var StopAndWaitObservable = require('./stopandwait');
5+
var WindowedObservable = require('./windowed');
6+
var multicast = require('./multicast');
7+
var Notification = require('../notification');
8+
var Observer = require('../observer');
9+
var Subject = require('../subject');
10+
var Disposable = require('../disposable');
11+
var Scheduler = require('../scheduler');
12+
var addProperties = require('../internal/addproperties');
13+
var inherits = require('inherits');
14+
15+
global._Rx || (global._Rx = {});
16+
if (!global._Rx.currentThreadScheduler) {
17+
require('../scheduler/currentthreadscheduler');
18+
}
19+
20+
if (!global._Rx.defaultScheduler) {
21+
require('../scheduler/defaultscheduler');
22+
}
23+
24+
function ControlledSubject(enableQueue, scheduler) {
25+
enableQueue == null && (enableQueue = true);
26+
this.subject = new Subject();
27+
this.enableQueue = enableQueue;
28+
this.queue = enableQueue ? [] : null;
29+
this.requestedCount = 0;
30+
this.requestedDisposable = null;
31+
this.error = null;
32+
this.hasFailed = false;
33+
this.hasCompleted = false;
34+
this.scheduler = scheduler || global._Rx.currentThreadScheduler;
35+
Observable.call(this);
36+
}
37+
38+
inherits(ControlledSubject, Observable);
39+
40+
addProperties(ControlledSubject.prototype, Observer, {
41+
_subscribe: function (o) {
42+
return this.subject.subscribe(o);
43+
},
44+
onCompleted: function () {
45+
this.hasCompleted = true;
46+
if (!this.enableQueue || this.queue.length === 0) {
47+
this.subject.onCompleted();
48+
this.disposeCurrentRequest();
49+
} else {
50+
this.queue.push(Notification.createOnCompleted());
51+
}
52+
},
53+
onError: function (error) {
54+
this.hasFailed = true;
55+
this.error = error;
56+
if (!this.enableQueue || this.queue.length === 0) {
57+
this.subject.onError(error);
58+
this.disposeCurrentRequest();
59+
} else {
60+
this.queue.push(Notification.createOnError(error));
61+
}
62+
},
63+
onNext: function (value) {
64+
if (this.requestedCount <= 0) {
65+
this.enableQueue && this.queue.push(Notification.createOnNext(value));
66+
} else {
67+
(this.requestedCount-- === 0) && this.disposeCurrentRequest();
68+
this.subject.onNext(value);
69+
}
70+
},
71+
_processRequest: function (numberOfItems) {
72+
if (this.enableQueue) {
73+
while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
74+
var first = this.queue.shift();
75+
first.accept(this.subject);
76+
if (first.kind === 'N') {
77+
numberOfItems--;
78+
} else {
79+
this.disposeCurrentRequest();
80+
this.queue = [];
81+
}
82+
}
83+
}
84+
85+
return numberOfItems;
86+
},
87+
request: function (number) {
88+
this.disposeCurrentRequest();
89+
var self = this;
90+
91+
this.requestedDisposable = this.scheduler.schedule(number,
92+
function(s, i) {
93+
var remaining = self._processRequest(i);
94+
var stopped = self.hasCompleted || self.hasFailed;
95+
if (!stopped && remaining > 0) {
96+
self.requestedCount = remaining;
97+
98+
return Disposable.create(function () {
99+
self.requestedCount = 0;
100+
});
101+
// Scheduled item is still in progress. Return a new
102+
// disposable to allow the request to be interrupted
103+
// via dispose.
104+
}
105+
});
106+
107+
return this.requestedDisposable;
108+
},
109+
disposeCurrentRequest: function () {
110+
if (this.requestedDisposable) {
111+
this.requestedDisposable.dispose();
112+
this.requestedDisposable = null;
113+
}
114+
}
115+
});
116+
117+
function ControlledObservable (source, enableQueue, scheduler) {
118+
this.subject = new ControlledSubject(enableQueue, scheduler);
119+
this.source = multicast(source, this.subject).refCount();
120+
Observable.call(this);
121+
}
122+
123+
inherits(ControlledObservable, Observable);
124+
125+
ControlledObservable.prototype._subscribe = function (o) {
126+
return this.source.subscribe(o);
127+
};
128+
129+
ControlledObservable.prototype.request = function (numberOfItems) {
130+
return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
131+
};
132+
133+
ControlledObservable.prototype.stopAndWait = function (scheduler) {
134+
Scheduler.isScheduler(scheduler) || (scheduler = global._Rx.defaultScheduler);
135+
return new StopAndWaitObservable(this, scheduler);
136+
};
137+
138+
ControlledObservable.prototype.windowed = function (windowSize, scheduler) {
139+
Scheduler.isScheduler(scheduler) || (scheduler = global._Rx.defaultScheduler);
140+
return new WindowedObservable(this, windowSize, scheduler);
141+
};
142+
143+
module.exports = function controlled (source, enableQueue, scheduler) {
144+
145+
if (enableQueue && Scheduler.isScheduler(enableQueue)) {
146+
scheduler = enableQueue;
147+
enableQueue = true;
148+
}
149+
150+
if (enableQueue == null) { enableQueue = true; }
151+
return new ControlledObservable(source, enableQueue, scheduler);
152+
};

src/modular/observable/pausable.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
3+
var Observable = require('../observable');
4+
var distinctUntilChanged = require('./distinctuntilchanged');
5+
var merge = require('./merge');
6+
var publish = require('./publish');
7+
var Subject = require('../subject');
8+
var Disposable = require('../disposable');
9+
var NAryDisposable = require('../narydisposable');
10+
var inherits = require('inherits');
11+
12+
function PausableObservable(source, pauser) {
13+
this.source = source;
14+
this.controller = new Subject();
15+
16+
if (pauser && pauser.subscribe) {
17+
this.pauser = merge(this.controller, pauser);
18+
} else {
19+
this.pauser = this.controller;
20+
}
21+
22+
Observable.call(this);
23+
}
24+
25+
inherits(PausableObservable, Observable);
26+
27+
PausableObservable.prototype._subscribe = function (o) {
28+
var conn = publish(this.source),
29+
subscription = conn.subscribe(o),
30+
connection = Disposable.empty;
31+
32+
var pausable = distinctUntilChanged(this.pauser).subscribe(function (b) {
33+
if (b) {
34+
connection = conn.connect();
35+
} else {
36+
connection.dispose();
37+
connection = Disposable.empty;
38+
}
39+
});
40+
41+
return new NAryDisposable([subscription, connection, pausable]);
42+
};
43+
44+
PausableObservable.prototype.pause = function () {
45+
this.controller.onNext(false);
46+
};
47+
48+
PausableObservable.prototype.resume = function () {
49+
this.controller.onNext(true);
50+
};
51+
52+
module.exports = function pausable (source, pauser) {
53+
return new PausableObservable(source, pauser);
54+
};

0 commit comments

Comments
 (0)