Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 3e2e075

Browse files
committed
multicast: don't reconnect once subject is stopped
A stopped subject shouldn't forward any more events, so using it to subscribe again is pointless at best and a cause of undesirable side effects at worst. Fixes #1112
1 parent d578883 commit 3e2e075

2 files changed

Lines changed: 52 additions & 1 deletion

File tree

src/core/linq/connectableobservable.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@
5555

5656
ConnectableObservable.prototype.connect = function () {
5757
if (!this._connection) {
58+
if (this._subject.isStopped) {
59+
return disposableEmpty;
60+
}
5861
var subscription = this._source.subscribe(this._subject);
5962
this._connection = new ConnectDisposable(this, subscription);
6063
}

tests/observable/multicast.js

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(function () {
22
'use strict';
33
/* jshint undef: true, unused: true */
4-
/* globals QUnit, test, Rx */
4+
/* globals QUnit, test, Rx, equal */
55

66
QUnit.module('multicast');
77

@@ -478,4 +478,52 @@
478478
subscribe(200, 390));
479479
});
480480

481+
// adapted from issue Reactive-Extensions/RxJS#1112
482+
test("multicast should not reconnect when stopped", function () {
483+
var scheduler = new TestScheduler();
484+
485+
var results = scheduler.createObserver();
486+
487+
var calls = 0;
488+
489+
function request(v) {
490+
calls++;
491+
return v * 2;
492+
}
493+
494+
var xs = scheduler.createColdObservable(
495+
onNext(1, 1),
496+
onNext(2, 2),
497+
onNext(3, 3),
498+
onCompleted(4));
499+
500+
var c, s;
501+
502+
scheduler.scheduleAbsolute(null, 300, function () {
503+
c = new Rx.AnonymousObservable(function (o) {
504+
return xs.subscribe(
505+
function (x) { o.onNext(request(x)); },
506+
function (e) { o.onError(e); },
507+
function () { o.onCompleted(); });
508+
}).multicast(new Subject());
509+
c.subscribe(results);
510+
s = c.connect();
511+
});
512+
513+
scheduler.scheduleAbsolute(null, 400, function () {
514+
s.dispose();
515+
c.connect();
516+
});
517+
518+
scheduler.start();
519+
520+
equal(calls, 3);
521+
522+
results.messages.assertEqual(
523+
onNext(301, 2),
524+
onNext(302, 4),
525+
onNext(303, 6),
526+
onCompleted(304));
527+
});
528+
481529
}());

0 commit comments

Comments
 (0)