Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c4d0e9a

Browse files
windowed/stopAndWait
1 parent 68d92e2 commit c4d0e9a

3 files changed

Lines changed: 369 additions & 3 deletions

File tree

src/modular/observable/stopandwait.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ var AbstractObserver = require('../observer/abstractobserver');
55
var BinaryDisposable = require('../binarydisposable');
66
var inherits = require('inherits');
77

8-
function StopAndWaitObserver (observer, observable, scheduler, cancel) {
8+
function StopAndWaitObserver(observer, observable, scheduler, cancel) {
99
this.observer = observer;
1010
this.observable = observable;
1111
this.scheduler = scheduler;
@@ -14,6 +14,8 @@ function StopAndWaitObserver (observer, observable, scheduler, cancel) {
1414
AbstractObserver.call(this);
1515
}
1616

17+
inherits(StopAndWaitObserver, AbstractObserver);
18+
1719
StopAndWaitObserver.prototype.completed = function () {
1820
this.observer.onCompleted();
1921
this.dispose();
@@ -33,7 +35,7 @@ StopAndWaitObserver.prototype.next = function (value) {
3335
this.scheduleDisposable = this.scheduler.schedule(this, innerScheduleMethod);
3436
};
3537

36-
StopAndWaitObserver.dispose = function () {
38+
StopAndWaitObserver.prototype.dispose = function () {
3739
this.observer = null;
3840
if (this.cancel) {
3941
this.cancel.dispose();
@@ -46,7 +48,7 @@ StopAndWaitObserver.dispose = function () {
4648
AbstractObserver.prototype.dispose.call(this);
4749
};
4850

49-
function StopAndWaitObservable (source, scheduler) {
51+
function StopAndWaitObservable(source, scheduler) {
5052
this.source = source;
5153
this.scheduler = scheduler;
5254
Observable.call(this);

src/modular/test/stopandwait.js

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
'use strict';
2+
3+
var test = require('tape');
4+
var Observable = require('../observable');
5+
var TestScheduler = require('../testing/testscheduler');
6+
var reactiveAssert = require('../testing/reactiveassert');
7+
var ReactiveTest = require('../testing/reactivetest');
8+
var onNext = ReactiveTest.onNext,
9+
onError = ReactiveTest.onError,
10+
onCompleted = ReactiveTest.onCompleted,
11+
subscribe = ReactiveTest.subscribe;
12+
13+
Observable.addToPrototype({
14+
controlled: require('../observable/controlled')
15+
});
16+
17+
test('ControlledObservable#stopAndWait never', function (t) {
18+
var scheduler = new TestScheduler();
19+
20+
var xs = scheduler.createHotObservable(
21+
onNext(150, 1)
22+
);
23+
24+
var results = scheduler.startScheduler(function () {
25+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
26+
});
27+
28+
reactiveAssert(t, results.messages, [
29+
30+
]);
31+
32+
reactiveAssert(t, xs.subscriptions, [
33+
subscribe(200, 1000)
34+
]);
35+
36+
t.end();
37+
});
38+
39+
test('ControlledObservable#stopAndWait empty', function (t) {
40+
var scheduler = new TestScheduler();
41+
42+
var xs = scheduler.createHotObservable(
43+
onNext(150, 1),
44+
onCompleted(250)
45+
);
46+
47+
var results = scheduler.startScheduler(function () {
48+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
49+
});
50+
51+
reactiveAssert(t, results.messages, [
52+
onCompleted(250)
53+
]);
54+
55+
reactiveAssert(t, xs.subscriptions, [
56+
subscribe(200, 250)
57+
]);
58+
59+
t.end();
60+
});
61+
62+
test('ControlledObservable#stopAndWait basic', function (t) {
63+
var scheduler = new TestScheduler();
64+
65+
var xs = scheduler.createHotObservable(
66+
onNext(150, 1),
67+
onNext(210, 2),
68+
onNext(220, 3),
69+
onNext(230, 4),
70+
onNext(240, 5),
71+
onCompleted(250)
72+
);
73+
74+
var results = scheduler.startScheduler(function () {
75+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
76+
});
77+
78+
reactiveAssert(t, results.messages, [
79+
onNext(210, 2),
80+
onNext(220, 3),
81+
onNext(230, 4),
82+
onNext(240, 5),
83+
onCompleted(250)
84+
]);
85+
86+
reactiveAssert(t, xs.subscriptions, [
87+
subscribe(200, 250)
88+
]);
89+
90+
t.end();
91+
});
92+
93+
test('ControlledObservable#stopAndWait error', function (t) {
94+
var error = new Error();
95+
96+
var scheduler = new TestScheduler();
97+
98+
var xs = scheduler.createHotObservable(
99+
onNext(150, 1),
100+
onNext(210, 2),
101+
onNext(220, 3),
102+
onNext(230, 4),
103+
onNext(240, 5),
104+
onError(250, error)
105+
);
106+
107+
var results = scheduler.startScheduler(function () {
108+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
109+
});
110+
111+
reactiveAssert(t, results.messages, [
112+
onNext(210, 2),
113+
onNext(220, 3),
114+
onNext(230, 4),
115+
onNext(240, 5),
116+
onError(250, error)
117+
]);
118+
119+
reactiveAssert(t, xs.subscriptions, [
120+
subscribe(200, 250)
121+
]);
122+
123+
t.end();
124+
});
125+
126+
test('ControlledObservable#stopAndWait infinite', function (t) {
127+
var scheduler = new TestScheduler();
128+
129+
var xs = scheduler.createHotObservable(
130+
onNext(150, 1),
131+
onNext(210, 2),
132+
onNext(220, 3),
133+
onNext(230, 4),
134+
onNext(240, 5)
135+
);
136+
137+
var results = scheduler.startScheduler(function () {
138+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
139+
});
140+
141+
reactiveAssert(t, results.messages, [
142+
onNext(210, 2),
143+
onNext(220, 3),
144+
onNext(230, 4),
145+
onNext(240, 5)
146+
]);
147+
148+
reactiveAssert(t, xs.subscriptions, [
149+
subscribe(200, 1000)
150+
]);
151+
152+
t.end();
153+
});
154+
155+
test('ControlledObservable#windowed disposed', function (t) {
156+
var scheduler = new TestScheduler();
157+
158+
var xs = scheduler.createHotObservable(
159+
onNext(150, 1),
160+
onNext(210, 2),
161+
onNext(220, 3),
162+
onNext(230, 4),
163+
onNext(240, 5),
164+
onNext(250)
165+
);
166+
167+
var results = scheduler.startScheduler(function () {
168+
return xs.controlled(true, scheduler).stopAndWait(scheduler);
169+
}, { disposed: 235 });
170+
171+
reactiveAssert(t, results.messages, [
172+
onNext(210, 2),
173+
onNext(220, 3),
174+
onNext(230, 4)
175+
]);
176+
177+
reactiveAssert(t, xs.subscriptions, [
178+
subscribe(200, 235)
179+
]);
180+
181+
t.end();
182+
});

0 commit comments

Comments
 (0)