Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 6dac036

Browse files
Removing longrunning
1 parent f5e6be8 commit 6dac036

10 files changed

Lines changed: 56 additions & 88 deletions

File tree

src/modular/observable/from.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ function FromObservable(iterable, fn, scheduler) {
100100

101101
inherits(FromObservable, ObservableBase);
102102

103-
function createScheduleMethod(o, it, fn) {
103+
function scheduleRecursive(o, it, fn) {
104104
return function loopRecursive(i, recurse) {
105105
var next = tryCatch(it.next).call(it);
106106
if (next === errorObj) { return o.onError(next.e); }
@@ -122,7 +122,7 @@ FromObservable.prototype.subscribeCore = function (o) {
122122
var list = Object(this._iterable),
123123
it = getIterable(list);
124124

125-
return this._scheduler.scheduleRecursive(0, createScheduleMethod(o, it, this._fn));
125+
return this._scheduler.scheduleRecursive(0, scheduleRecursive(o, it, this._fn));
126126
};
127127

128128
/**

src/modular/observable/fromarray.js

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@ var ObservableBase = require('./observablebase');
44
var Scheduler = require('../scheduler');
55
var inherits = require('inherits');
66

7-
function scheduleMethod(o, args) {
8-
return function loopRecursive (i, recurse) {
9-
if (i < args.length) {
10-
o.onNext(args[i]);
11-
recurse(i + 1);
12-
} else {
13-
o.onCompleted();
14-
}
15-
};
7+
function scheduleRecursive(state, recurse) {
8+
if (state.i < state.len) {
9+
state.o.onNext(state.args[state.i++]);
10+
recurse(state);
11+
} else {
12+
state.o.onCompleted();
13+
}
1614
}
1715

1816
function FromArrayObservable(args, scheduler) {
@@ -24,7 +22,13 @@ function FromArrayObservable(args, scheduler) {
2422
inherits(FromArrayObservable, ObservableBase);
2523

2624
FromArrayObservable.prototype.subscribeCore = function (o) {
27-
return this._scheduler.scheduleRecursive(0, scheduleMethod(o, this._args));
25+
var state = {
26+
i: 0,
27+
args: this._args,
28+
len: this._args.length,
29+
o: o
30+
};
31+
return this._scheduler.scheduleRecursive(state, scheduleRecursive);
2832
};
2933

3034
module.exports = function fromArray(array, scheduler) {

src/modular/observable/range.js

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,6 @@ function RangeObservable(start, count, scheduler) {
1313

1414
inherits(RangeObservable, ObservableBase);
1515

16-
function loopLongRunning(start, count, o) {
17-
return function loop (i, cancel) {
18-
while (!cancel.isDisposed && i < count) {
19-
o.onNext(start + i);
20-
i++;
21-
}
22-
23-
if (!cancel.isDisposed) {
24-
o.onCompleted();
25-
}
26-
};
27-
}
28-
2916
function loopRecursive(start, count, o) {
3017
return function loop (i, recurse) {
3118
if (i < count) {
@@ -38,13 +25,6 @@ function loopRecursive(start, count, o) {
3825
}
3926

4027
RangeObservable.prototype.subscribeCore = function (o) {
41-
if (this.scheduler.scheduleLongRunning) {
42-
return this.scheduler.scheduleLongRunning(
43-
0,
44-
loopLongRunning(this.start, this.rangeCount, o)
45-
);
46-
}
47-
4828
return this.scheduler.scheduleRecursive(
4929
0,
5030
loopRecursive(this.start, this.rangeCount, o)
Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,35 @@
11
'use strict';
22

3-
var just = require('./just');
4-
var repeat = require('./repeat');
3+
var ObservableBase = require('./observablebase');
54
var Scheduler = require('../scheduler');
5+
var inherits = require('inherits');
66

7-
module.exports = function repeatValue(value, repeatCount, scheduler) {
7+
function RepeatValueObservable(value, count, scheduler) {
8+
this._value = value;
9+
this._count = count;
10+
this._scheduler = scheduler;
11+
ObservableBase.call(this);
12+
}
13+
14+
inherits(RepeatValueObservable, ObservableBase);
15+
16+
function scheduleRecursive(state, recurse) {
17+
if (state.n === 0) { return state.o.onCompleted(); }
18+
if (state.n > 0) { state.n--; }
19+
state.o.onNext(state.value);
20+
recurse(state);
21+
}
22+
23+
RepeatValueObservable.prototype.subscribeCore = function (o) {
24+
var state = {
25+
value: this._value,
26+
n: this._count,
27+
o: o
28+
};
29+
return this._scheduler.scheduleRecursive(state, scheduleRecursive);
30+
};
31+
32+
module.exports = function repeatValue (value, repeatCount, scheduler) {
833
Scheduler.isScheduler(scheduler) || (scheduler = Scheduler.queue);
9-
return repeat(just(value, scheduler), repeatCount);
34+
return new RepeatValueObservable(value, repeatCount, scheduler);
1035
};

src/modular/observable/takelast.js

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,9 @@ function loopRecursive(state, recurse) {
3838
}
3939
}
4040

41-
function loopLongRunning(state, cancel) {
42-
var o = state[0], q = state[1], n = q.length;
43-
while(!cancel.isDisposed) {
44-
if (n === 0) {
45-
o.onCompleted();
46-
} else {
47-
o.onNext(q.shift());
48-
}
49-
n--;
50-
}
51-
}
52-
5341
TakeLastObserver.prototype.completed = function () {
5442
this._ss.dispose();
55-
if (this._s.scheduleLongRunning) {
56-
this._ls.setDisposable(this._s.scheduleLongRunning([this._o, this._q], loopLongRunning));
57-
} else {
58-
this._ls.setDisposable(this._s.scheduleRecursive([this._o, this._q], loopRecursive));
59-
}
43+
this._ls.setDisposable(this._s.scheduleRecursive([this._o, this._q], loopRecursive));
6044
};
6145

6246
function TakeLastObservable(source, count, scheduler) {

src/modular/scheduler/defaultscheduler.js

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,4 @@ DefaultScheduler.prototype._scheduleFuture = function (state, dueTime, action) {
169169
return new BinaryDisposable(disposable, new ClearDisposable(global.clearTimeout, id));
170170
};
171171

172-
function scheduleLongRunning(state, action, disposable) {
173-
return function () { action(state, disposable); };
174-
}
175-
176-
DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
177-
var disposable = Disposable.create(noop);
178-
var id = scheduleMethod(scheduleLongRunning(state, action, disposable));
179-
return new BinaryDisposable(disposable, new ClearDisposable(clearMethod, id));
180-
};
181-
182172
module.exports = DefaultScheduler;

src/modular/test/repeat.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ test('Observable#repeat value count zero', function (t) {
2929
});
3030

3131
reactiveAssert(t, results.messages, [
32-
onCompleted(200)
32+
onCompleted(201)
3333
]);
3434

3535
t.end();
@@ -44,7 +44,7 @@ test('Observable#repeat value count one', function (t) {
4444

4545
reactiveAssert(t, results.messages, [
4646
onNext(201, 42),
47-
onCompleted(201)
47+
onCompleted(202)
4848
]);
4949

5050
t.end();
@@ -68,7 +68,7 @@ test('Observable#repeat value count ten', function (t) {
6868
onNext(208, 42),
6969
onNext(209, 42),
7070
onNext(210, 42),
71-
onCompleted(210)
71+
onCompleted(211)
7272
]);
7373

7474
t.end();
@@ -121,7 +121,8 @@ test('Observable#repeat Observable basic', function (t) {
121121
onNext(100, 1),
122122
onNext(150, 2),
123123
onNext(200, 3),
124-
onCompleted(250));
124+
onCompleted(250)
125+
);
125126

126127
var results = scheduler.startScheduler(function () {
127128
return xs.repeat();

tests/perf/operators/fromarray.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
var RxOld = require('../old/rx.lite');
2-
var RxNew = require('../../../dist/rx.lite');
2+
var RxNew = require('../../../src/modular/.');
33
var Benchmark = require('benchmark');
44

55
var suite = new Benchmark.Suite;

tests/perf/operators/range.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
var RxOld = require('../old/rx.lite');
2-
var RxNew = require('../../../dist/rx.lite');
1+
var RxOld = require('../../../dist/rx.lite');
2+
var RxNew = require('../../../src/modular/.');
33
var Benchmark = require('benchmark');
44

55
var suite = new Benchmark.Suite;

tests/perf/operators/repeat.js

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,9 @@
1-
var RxOld = require('../old/rx.lite');
2-
var RxNew = require('../../../dist/rx.lite');
1+
var RxOld = require('../../../dist/rx.lite');
2+
var RxNew = require('../../../src/modular/.');
33
var Benchmark = require('benchmark');
44

55
var suite = new Benchmark.Suite;
66

7-
// Backfill range to get rid of differences
8-
RxOld.range = function (start, count) {
9-
var scheduler = RxNew.Scheduler.currentThread;
10-
return new RxNew.AnonymousObservable(function (observer) {
11-
return scheduler.scheduleRecursive(0, function (i, self) {
12-
if (i < count) {
13-
observer.onNext(start + i);
14-
self(i + 1);
15-
} else {
16-
observer.onCompleted();
17-
}
18-
});
19-
});
20-
};
21-
RxNew.range = RxOld.range;
22-
237
// add tests
248
suite.add('old', function() {
259
RxOld.Observable.repeat(42, 25).subscribe();

0 commit comments

Comments
 (0)