Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 5fc6c6d

Browse files
startWith
1 parent 2cceb47 commit 5fc6c6d

4 files changed

Lines changed: 218 additions & 0 deletions

File tree

src/core/concurrency/defaultscheduler.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,16 @@
194194
return new BinaryDisposable(disposable, new LocalClearDisposable(id));
195195
};
196196

197+
function scheduleLongRunning(state, action, disposable) {
198+
return function () { action(state, disposable); };
199+
}
200+
201+
DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
202+
var disposable = disposableCreate(noop);
203+
scheduleMethod(scheduleLongRunning(state, action, disposable));
204+
return disposable;
205+
};
206+
197207
return DefaultScheduler;
198208
}(Scheduler));
199209

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
'use strict';
2+
3+
var concat = require('./concat');
4+
var fromArray = require('./fromarray');
5+
var isScheduler = require('../scheduler').isScheduler;
6+
7+
global._Rx || (global._Rx = {});
8+
if (!global._Rx.immediateScheduler) {
9+
require('../scheduler/immediatescheduler');
10+
}
11+
12+
module.exports = function startWith () {
13+
var source = arguments[0], scheduler, start = 1;
14+
if (isScheduler(arguments[1])) {
15+
scheduler = arguments[1];
16+
start = 2;
17+
} else {
18+
scheduler = global._Rx.immediateScheduler;
19+
}
20+
for(var args = [], i = start, len = arguments.length; i < len; i++) { args.push(arguments[i]); }
21+
return concat(fromArray(args, scheduler), source);
22+
};

src/modular/scheduler/defaultscheduler.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var BinaryDisposable = require('../binarydisposable');
55
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
66
var Scheduler = require('../scheduler');
77
var isFunction = require('../helpers/isfunction');
8+
var noop = require('../helpers/noop');
89
var tryCatchUtils = require('../internal/trycatchutils');
910
var tryCatch = tryCatchUtils.tryCatch, thrower = tryCatchUtils.thrower;
1011
var inherits = require('inherits');
@@ -168,6 +169,16 @@ DefaultScheduler.prototype._scheduleFuture = function (state, dueTime, action) {
168169
return new BinaryDisposable(disposable, new ClearDisposable(global.clearTimeout, id));
169170
};
170171

172+
function scheduleLongRunning(state, action, disposable) {
173+
return function () { action(state, disposable); };
174+
}
175+
176+
DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
177+
var disposable = Disposable.create(noop);
178+
scheduleMethod(scheduleLongRunning(state, action, disposable));
179+
return disposable;
180+
};
181+
171182
global._Rx || (global._Rx = {});
172183
global._Rx.defaultScheduler = new DefaultScheduler();
173184

src/modular/test/startwith.js

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
'use strict';
2+
3+
var test = require('tape');
4+
var Observable = require('../observable');
5+
var TestScheduler = require('../testing/testscheduler');
6+
var reactiveAssert = require('../testing/reactiveassert');
7+
var ReactiveTest = require('../testing/reactivetest');
8+
var onNext = ReactiveTest.onNext,
9+
onError = ReactiveTest.onError,
10+
onCompleted = ReactiveTest.onCompleted;
11+
12+
Observable.addToPrototype({
13+
startWith: require('../observable/startwith')
14+
});
15+
16+
global._Rx || (global._Rx = {});
17+
if (!global._Rx.currentThreadScheduler) {
18+
require('../scheduler/currentthreadscheduler');
19+
}
20+
21+
test('Observable#startWith normal', function (t) {
22+
var scheduler = new TestScheduler();
23+
24+
var xs = scheduler.createHotObservable(
25+
onNext(150, 1),
26+
onNext(220, 2),
27+
onCompleted(250)
28+
);
29+
30+
var results = scheduler.startScheduler(function () {
31+
return xs.startWith(1);
32+
});
33+
34+
reactiveAssert(t, results.messages, [
35+
onNext(200, 1),
36+
onNext(220, 2),
37+
onCompleted(250)
38+
]);
39+
40+
t.end();
41+
});
42+
43+
test('Observable#startWith never', function (t) {
44+
var scheduler = new TestScheduler();
45+
46+
var xs = scheduler.createHotObservable(
47+
onNext(150, 1)
48+
);
49+
50+
var results = scheduler.startScheduler(function () {
51+
return xs.startWith(scheduler, 1);
52+
});
53+
54+
reactiveAssert(t, results.messages, [
55+
onNext(201, 1)
56+
]);
57+
58+
t.end();
59+
});
60+
61+
test('Observable#startWith empty', function (t) {
62+
var scheduler = new TestScheduler();
63+
64+
var xs = scheduler.createHotObservable(
65+
onNext(150, 1),
66+
onCompleted(250)
67+
);
68+
69+
var results = scheduler.startScheduler(function () {
70+
return xs.startWith(scheduler, 1);
71+
});
72+
73+
reactiveAssert(t, results.messages, [
74+
onNext(201, 1),
75+
onCompleted(250)
76+
]);
77+
78+
t.end();
79+
});
80+
81+
test('Observable#startWith one', function (t) {
82+
var scheduler = new TestScheduler();
83+
84+
var xs = scheduler.createHotObservable(
85+
onNext(150, 1),
86+
onNext(220, 2),
87+
onCompleted(250)
88+
);
89+
90+
var results = scheduler.startScheduler(function () {
91+
return xs.startWith(scheduler, 1);
92+
});
93+
94+
reactiveAssert(t, results.messages, [
95+
onNext(201, 1),
96+
onNext(220, 2),
97+
onCompleted(250)
98+
]);
99+
100+
t.end();
101+
});
102+
103+
test('Observable#startWith multiple', function (t) {
104+
var scheduler = new TestScheduler();
105+
106+
var xs = scheduler.createHotObservable(
107+
onNext(150, 1),
108+
onNext(220, 4),
109+
onCompleted(250)
110+
);
111+
112+
var results = scheduler.startScheduler(function () {
113+
return xs.startWith(scheduler, 1, 2, 3);
114+
});
115+
116+
reactiveAssert(t, results.messages, [
117+
onNext(201, 1),
118+
onNext(202, 2),
119+
onNext(203, 3),
120+
onNext(220, 4),
121+
onCompleted(250)
122+
]);
123+
124+
t.end();
125+
});
126+
127+
test('Observable#startWith error', function (t) {
128+
var error = new Error();
129+
130+
var scheduler = new TestScheduler();
131+
132+
var xs = scheduler.createHotObservable(
133+
onNext(150, 1),
134+
onError(250, error)
135+
);
136+
137+
var results = scheduler.startScheduler(function () {
138+
return xs.startWith(scheduler, 1, 2, 3);
139+
});
140+
141+
reactiveAssert(t, results.messages, [
142+
onNext(201, 1),
143+
onNext(202, 2),
144+
onNext(203, 3),
145+
onError(250, error)
146+
]);
147+
148+
t.end();
149+
});
150+
151+
test('Observable#startWith is unaffected by currentThread scheduler', function (t) {
152+
var scheduler = new TestScheduler();
153+
154+
var xs = scheduler.createHotObservable(
155+
onNext(150, 1),
156+
onNext(220, 2),
157+
onCompleted(250)
158+
);
159+
160+
var results;
161+
162+
global._Rx.currentThreadScheduler.schedule(null, function () {
163+
results = scheduler.startScheduler(function () {
164+
return xs.startWith(scheduler, 1);
165+
});
166+
});
167+
168+
reactiveAssert(t, results.messages, [
169+
onNext(201, 1),
170+
onNext(220, 2),
171+
onCompleted(250)
172+
]);
173+
174+
t.end();
175+
});

0 commit comments

Comments
 (0)