Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c15bf5c

Browse files
Adding pausableBuffered
1 parent d578883 commit c15bf5c

10 files changed

Lines changed: 1000 additions & 28 deletions

File tree

src/modular/index.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Observer.addToObject({
99
var Observable = require('./observable');
1010

1111
Observable.addToObject({
12-
amb: require('./observable/amb'),
12+
race: require('./observable/race'),
1313
bindCallback: require('./observable/bindcallback'),
1414
bindNodeCallback: require('./observable/bindnodecallback'),
1515
case: require('./observable/case'),
@@ -50,7 +50,7 @@ Observable.addToObject({
5050
});
5151

5252
Observable.addToPrototype({
53-
amb: require('./observable/amb'),
53+
race: require('./observable/race'),
5454
and: require('./observable/and'),
5555
asObservable: require('./observable/asobservable'),
5656
average: require('./observable/average'),
@@ -104,6 +104,8 @@ Observable.addToPrototype({
104104
onErrorResumeNext: require('./observable/onerrorresumenext'),
105105
pairwise: require('./observable/pairwise'),
106106
partition: require('./observable/partition'),
107+
pausable: require('./observable/pausable'),
108+
pausableBuffered: require('./observable/pausablebuffered'),
107109
pluck: require('./observable/pluck'),
108110
publish: require('./observable/publish'),
109111
publishLast: require('./observable/publishlast'),
@@ -164,22 +166,32 @@ Subject.addToObject({
164166
});
165167

166168
var Rx = {
169+
// Disposables
167170
BinaryDisposable: require('./binarydisposable'),
168171
CompositeDisposable: require('./compositedisposable'),
169172
Disposable: require('./disposable'),
170173
NAryDisposable: require('./narydisposable'),
171174
SerialDisposable: require('./serialdisposable'),
172175
SingleAssignmentDisposable: require('./singleassignmentdisposable'),
173176

177+
// Schedulers
174178
Scheduler: require('./scheduler'),
179+
VirtualTimeScheduler: require('./scheduler/virtualtimescheduler'),
180+
HistoricalScheduler: require('./scheduler/historicalscheduler'),
175181

182+
// Core
176183
Observer: Observer,
177184
Observable: Observable,
178185

186+
// Subjects
179187
AsyncSubject: require('./asyncsubject'),
180188
BehaviorSubject: require('./behaviorsubject'),
181189
ReplaySubject: require('./replaysubject'),
182-
Subject: Subject
190+
Subject: Subject,
191+
192+
// Testing
193+
reactiveTest: require('./testing/reactivetest'),
194+
TestScheduler: require('./testing/testscheduler')
183195
};
184196

185197
module.exports = Rx;

src/modular/observable/pausable.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var Observable = require('../observable');
44
var distinctUntilChanged = require('./distinctuntilchanged');
55
var merge = require('./merge');
66
var publish = require('./publish');
7+
var startWith = require('./startwith');
78
var Subject = require('../subject');
89
var Disposable = require('../disposable');
910
var NAryDisposable = require('../narydisposable');
@@ -29,7 +30,7 @@ PausableObservable.prototype._subscribe = function (o) {
2930
subscription = conn.subscribe(o),
3031
connection = Disposable.empty;
3132

32-
var pausable = distinctUntilChanged(this.pauser).subscribe(function (b) {
33+
var pausable = startWith(distinctUntilChanged(this.pauser), !this.paused).subscribe(function (b) {
3334
if (b) {
3435
connection = conn.connect();
3536
} else {

src/modular/observable/pausablebuffered.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ PausableBufferedObserver.prototype.completed = function () {
131131
function PausableBufferedObservable(source, pauser) {
132132
this.source = source;
133133
this.controller = new Subject();
134+
this.paused = true;
134135

135136
if (pauser && pauser.subscribe) {
136137
this.pauser = merge(this.controller, pauser);
@@ -151,17 +152,19 @@ PausableBufferedObservable.prototype._subscribe = function (o) {
151152

152153
return combineLatestSource(
153154
this.source,
154-
distinctUntilChanged(startWith(this.pauser, false)),
155+
distinctUntilChanged(startWith(this.pauser, !this.paused)),
155156
selectorFn)
156157
.subscribe(new PausableBufferedObserver(o));
157158
};
158159

159160
PausableBufferedObservable.prototype.pause = function () {
160161
this.controller.onNext(false);
162+
this.paused = true;
161163
};
162164

163165
PausableBufferedObservable.prototype.resume = function () {
164166
this.controller.onNext(true);
167+
this.paused = false;
165168
};
166169

167170
module.exports = function pausableBuffered (source, pauser) {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
'use strict';
2+
3+
var VirtualTimeScheduler = require('./virtualtimescheduler');
4+
var inherits = require('inherits');
5+
6+
function baseComparer(x, y) { return x > y ? 1 : (x < y ? -1 : 0); }
7+
8+
/**
9+
* Provides a virtual time scheduler that uses Date for absolute time and number for relative time.
10+
* Creates a new historical scheduler with the specified initial clock value.
11+
* @constructor
12+
* @param {Number} initialClock Initial value for the clock.
13+
* @param {Function} comparer Comparer to determine causality of events based on absolute time.
14+
*/
15+
function HistoricalScheduler(initialClock, comparer) {
16+
var clock = initialClock == null ? 0 : initialClock;
17+
var cmp = comparer || baseComparer;
18+
VirtualTimeScheduler.call(this, clock, cmp);
19+
}
20+
21+
inherits(HistoricalScheduler, VirtualTimeScheduler);
22+
23+
/**
24+
* Adds a relative time value to an absolute time value.
25+
* @param {Number} absolute Absolute virtual time value.
26+
* @param {Number} relative Relative virtual time value to add.
27+
* @return {Number} Resulting absolute virtual time sum value.
28+
*/
29+
HistoricalScheduler.prototype.add = function (absolute, relative) {
30+
return absolute + relative;
31+
};
32+
33+
HistoricalScheduler.prototype.toAbsoluteTime = function (absolute) {
34+
return new Date(absolute).getTime();
35+
};
36+
37+
/**
38+
* Converts the TimeSpan value to a relative virtual time value.
39+
* @memberOf HistoricalScheduler
40+
* @param {Number} timeSpan TimeSpan value to convert.
41+
* @return {Number} Corresponding relative virtual time value.
42+
*/
43+
HistoricalScheduler.prototype.toRelativeTime = function (timeSpan) {
44+
return timeSpan;
45+
};
46+
47+
module.exports = HistoricalScheduler;

0 commit comments

Comments
 (0)