Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c3f9d87

Browse files
committed
pausable[Buffered]: remember pause state
...across multiple subscriptions.
1 parent 4f96711 commit c3f9d87

4 files changed

Lines changed: 196 additions & 2 deletions

File tree

src/core/backpressure/pausable.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
function PausableObservable(source, pauser) {
44
this.source = source;
55
this.controller = new Subject();
6+
this.paused = true;
67

78
if (pauser && pauser.subscribe) {
89
this.pauser = this.controller.merge(pauser);
@@ -18,7 +19,7 @@
1819
subscription = conn.subscribe(o),
1920
connection = disposableEmpty;
2021

21-
var pausable = this.pauser.distinctUntilChanged().subscribe(function (b) {
22+
var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) {
2223
if (b) {
2324
connection = conn.connect();
2425
} else {
@@ -31,10 +32,12 @@
3132
};
3233

3334
PausableObservable.prototype.pause = function () {
35+
this.paused = true;
3436
this.controller.onNext(false);
3537
};
3638

3739
PausableObservable.prototype.resume = function () {
40+
this.paused = false;
3841
this.controller.onNext(true);
3942
};
4043

src/core/backpressure/pausablebuffered.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
function PausableBufferedObservable(source, pauser) {
5353
this.source = source;
5454
this.controller = new Subject();
55+
this.paused = true;
5556

5657
if (pauser && pauser.subscribe) {
5758
this.pauser = this.controller.merge(pauser);
@@ -70,7 +71,7 @@
7071
var subscription =
7172
combineLatestSource(
7273
this.source,
73-
this.pauser.startWith(false).distinctUntilChanged(),
74+
this.pauser.startWith(!this.paused).distinctUntilChanged(),
7475
function (data, shouldFire) {
7576
return { data: data, shouldFire: shouldFire };
7677
})
@@ -103,10 +104,12 @@
103104
};
104105

105106
PausableBufferedObservable.prototype.pause = function () {
107+
this.paused = true;
106108
this.controller.onNext(false);
107109
};
108110

109111
PausableBufferedObservable.prototype.resume = function () {
112+
this.paused = false;
110113
this.controller.onNext(true);
111114
};
112115

tests/observable/pausable.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,98 @@
201201
);
202202
});
203203

204+
test('paused with default controller and multiple subscriptions', function () {
205+
var paused, subscription, subscription2;
206+
207+
var scheduler = new TestScheduler();
208+
209+
var results = scheduler.createObserver();
210+
var results2 = scheduler.createObserver();
211+
212+
var xs = scheduler.createHotObservable(
213+
onNext(150, 1),
214+
onNext(210, 2),
215+
onNext(230, 3),
216+
onNext(301, 4),
217+
onNext(350, 5),
218+
onNext(399, 6),
219+
onCompleted(500)
220+
);
221+
222+
scheduler.scheduleAbsolute(null, 200, function () {
223+
paused = xs.pausable();
224+
subscription = paused.subscribe(results);
225+
paused.resume();
226+
});
227+
228+
scheduler.scheduleAbsolute(null, 240, function () {
229+
subscription2 = paused.subscribe(results2);
230+
});
231+
232+
scheduler.scheduleAbsolute(null, 1000, function () {
233+
subscription.dispose();
234+
subscription2.dispose();
235+
});
236+
237+
scheduler.start();
238+
239+
results.messages.assertEqual(
240+
onNext(210, 2),
241+
onNext(230, 3),
242+
onNext(301, 4),
243+
onNext(350, 5),
244+
onNext(399, 6),
245+
onCompleted(500)
246+
);
247+
248+
results2.messages.assertEqual(
249+
onNext(301, 4),
250+
onNext(350, 5),
251+
onNext(399, 6),
252+
onCompleted(500)
253+
);
254+
});
255+
256+
test('pausable is unaffected by currentThread scheduler', function () {
257+
var subscription;
258+
259+
var scheduler = new TestScheduler();
260+
261+
var controller = new Subject();
262+
263+
var results = scheduler.createObserver();
264+
265+
var xs = scheduler.createHotObservable(
266+
onNext(150, 1),
267+
onNext(210, 2),
268+
onNext(230, 3),
269+
onNext(301, 4),
270+
onNext(350, 5),
271+
onNext(399, 6),
272+
onCompleted(500)
273+
);
274+
275+
scheduler.scheduleAbsolute(null, 200, function () {
276+
Rx.Scheduler.currentThread.schedule(null, function () {
277+
subscription = xs.pausable(controller).subscribe(results);
278+
controller.onNext(true);
279+
});
280+
});
281+
282+
scheduler.scheduleAbsolute(null, 1000, function () {
283+
subscription.dispose();
284+
});
285+
286+
scheduler.start();
287+
288+
results.messages.assertEqual(
289+
onNext(210, 2),
290+
onNext(230, 3),
291+
onNext(301, 4),
292+
onNext(350, 5),
293+
onNext(399, 6),
294+
onCompleted(500)
295+
);
296+
});
297+
204298
}());

tests/observable/pausablebuffered.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,4 +461,98 @@
461461
QUnit.deepEqual(results, [1]);
462462
});
463463

464+
test('paused with default controller and multiple subscriptions', function () {
465+
var paused, subscription, subscription2;
466+
467+
var scheduler = new TestScheduler();
468+
469+
var results = scheduler.createObserver();
470+
var results2 = scheduler.createObserver();
471+
472+
var xs = scheduler.createHotObservable(
473+
onNext(150, 1),
474+
onNext(210, 2),
475+
onNext(230, 3),
476+
onNext(301, 4),
477+
onNext(350, 5),
478+
onNext(399, 6),
479+
onCompleted(500)
480+
);
481+
482+
scheduler.scheduleAbsolute(null, 200, function () {
483+
paused = xs.pausableBuffered();
484+
subscription = paused.subscribe(results);
485+
paused.resume();
486+
});
487+
488+
scheduler.scheduleAbsolute(null, 240, function () {
489+
subscription2 = paused.subscribe(results2);
490+
});
491+
492+
scheduler.scheduleAbsolute(null, 1000, function () {
493+
subscription.dispose();
494+
subscription2.dispose();
495+
});
496+
497+
scheduler.start();
498+
499+
results.messages.assertEqual(
500+
onNext(210, 2),
501+
onNext(230, 3),
502+
onNext(301, 4),
503+
onNext(350, 5),
504+
onNext(399, 6),
505+
onCompleted(500)
506+
);
507+
508+
results2.messages.assertEqual(
509+
onNext(301, 4),
510+
onNext(350, 5),
511+
onNext(399, 6),
512+
onCompleted(500)
513+
);
514+
});
515+
516+
test('pausableBuffered is unaffected by currentThread scheduler', function () {
517+
var subscription;
518+
519+
var scheduler = new TestScheduler();
520+
521+
var controller = new Subject();
522+
523+
var results = scheduler.createObserver();
524+
525+
var xs = scheduler.createHotObservable(
526+
onNext(150, 1),
527+
onNext(210, 2),
528+
onNext(230, 3),
529+
onNext(301, 4),
530+
onNext(350, 5),
531+
onNext(399, 6),
532+
onCompleted(500)
533+
);
534+
535+
scheduler.scheduleAbsolute(null, 200, function () {
536+
Rx.Scheduler.currentThread.schedule(null, function () {
537+
subscription = xs.pausableBuffered(controller).subscribe(results);
538+
controller.onNext(true);
539+
});
540+
});
541+
542+
scheduler.scheduleAbsolute(null, 1000, function () {
543+
subscription.dispose();
544+
});
545+
546+
scheduler.start();
547+
548+
results.messages.assertEqual(
549+
onNext(210, 2),
550+
onNext(230, 3),
551+
onNext(301, 4),
552+
onNext(350, 5),
553+
onNext(399, 6),
554+
onCompleted(500)
555+
);
556+
});
557+
464558
}());

0 commit comments

Comments
 (0)